diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 22dd14ce..f2963746 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None, None); + let mut chunk_stream = ChunkStream::new(stream, None, None, None); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 070a10c1..e3f0980c 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; -use pbs_datastore::{Chunker, ChunkerImpl}; +use pbs_datastore::{Chunker, ChunkerImpl, PayloadChunker}; use crate::inject_reused_chunks::InjectChunks; @@ -42,11 +42,20 @@ pub struct ChunkStream { } impl ChunkStream { - pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { + pub fn new( + input: S, + chunk_size: Option, + injection_data: Option, + suggested_boundaries: Option>, + ) -> Self { let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024); Self { input, - chunker: Box::new(ChunkerImpl::new(chunk_size)), + chunker: if let Some(suggested) = suggested_boundaries { + Box::new(PayloadChunker::new(chunk_size, suggested)) + } else { + Box::new(ChunkerImpl::new(chunk_size)) + }, buffer: BytesMut::new(), scan_pos: 0, consumed: 0, diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index eadd670d..03a6a144 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -169,6 +169,7 @@ struct Archiver { file_copy_buffer: Vec, skip_e2big_xattr: bool, forced_boundaries: Option>, + suggested_boundaries: Option>, previous_payload_index: Option, cache: PxarLookaheadCache, reuse_stats: ReuseStats, @@ -197,6 +198,7 @@ pub async fn create_archive( callback: F, options: PxarCreateOptions, forced_boundaries: Option>, + suggested_boundaries: Option>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -271,6 +273,7 @@ where file_copy_buffer: vec::undefined(4 * 1024 * 1024), skip_e2big_xattr: options.skip_e2big_xattr, forced_boundaries, + suggested_boundaries, previous_payload_index, cache: PxarLookaheadCache::new(None), reuse_stats: ReuseStats::default(), @@ -862,6 +865,11 @@ impl Archiver { .add_file(c_file_name, file_size, stat.st_mtime)?; } + if let Some(sender) = self.suggested_boundaries.as_mut() { + let offset = encoder.payload_position()?.raw(); + sender.send(offset)?; + } + let offset: LinkOffset = if let Some(payload_offset) = payload_offset { self.reuse_stats.total_reused_payload_size += file_size + size_of::() as u64; diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index fb6d063f..f322566f 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -27,6 +27,7 @@ use crate::pxar::create::PxarWriters; /// consumer. pub struct PxarBackupStream { rx: Option, Error>>>, + pub suggested_boundaries: Option>, handle: Option, error: Arc>>, } @@ -55,22 +56,26 @@ impl PxarBackupStream { )); let writer = pxar::encoder::sync::StandardWriter::new(writer); - let (writer, payload_rx) = if separate_payload_stream { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( - buffer_size, - StdChannelWriter::new(tx), - )); - ( - pxar::PxarVariant::Split( - writer, - pxar::encoder::sync::StandardWriter::new(payload_writer), - ), - Some(rx), - ) - } else { - (pxar::PxarVariant::Unified(writer), None) - }; + let (writer, payload_rx, suggested_boundaries_tx, suggested_boundaries_rx) = + if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let (suggested_boundaries_tx, suggested_boundaries_rx) = std::sync::mpsc::channel(); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + ( + pxar::PxarVariant::Split( + writer, + pxar::encoder::sync::StandardWriter::new(payload_writer), + ), + Some(rx), + Some(suggested_boundaries_tx), + Some(suggested_boundaries_rx), + ) + } else { + (pxar::PxarVariant::Unified(writer), None, None, None) + }; let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); @@ -85,6 +90,7 @@ impl PxarBackupStream { }, options, boundaries, + suggested_boundaries_tx, ) .await { @@ -99,12 +105,14 @@ impl PxarBackupStream { let backup_stream = Self { rx: Some(rx), + suggested_boundaries: None, handle: Some(handle.clone()), error: Arc::clone(&error), }; let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), + suggested_boundaries: suggested_boundaries_rx, handle: Some(handle), error, }); diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index 87083d74..99faf019 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -209,7 +209,7 @@ async fn backup_directory>( payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None, None); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -223,14 +223,19 @@ async fn backup_directory>( let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); - if let Some(payload_stream) = payload_stream { + if let Some(mut payload_stream) = payload_stream { let payload_target = payload_target .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); - let mut payload_chunk_stream = - ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); + let suggested_boundaries = payload_stream.suggested_boundaries.take(); + let mut payload_chunk_stream = ChunkStream::new( + payload_stream, + chunk_size, + Some(injection_data), + suggested_boundaries, + ); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -573,7 +578,8 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); + let catalog_chunk_stream = + ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None, None); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index 681fa6db..80af5011 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -364,8 +364,16 @@ fn extract( }; let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer)); - create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None) - .await + create_archive( + dir, + PxarWriters::new(pxar_writer, None), + Flags::DEFAULT, + |_| Ok(()), + options, + None, + None, + ) + .await } .await; if let Err(err) = result { diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 85887a8e..fa584b4e 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -442,6 +442,7 @@ async fn create_archive( }, options, None, + None, ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 9f83b4cc..94c56501 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -40,6 +40,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { |_| Ok(()), options, None, + None, ))?; Command::new("cmp")