From 5a5d454083f27dab68a4f98441dfb40879aadabc Mon Sep 17 00:00:00 2001 From: Christian Ebner Date: Fri, 19 Apr 2024 17:14:12 +0200 Subject: [PATCH] client: chunk stream: switch payload stream chunker Use the dedicated chunker with boundary suggestions for the payload stream, by attaching the channel sender to the archiver and the channel receiver to the payload stream chunker. The archiver sends the file boundaries for the chunker to consume. Signed-off-by: Christian Ebner --- examples/test_chunk_speed2.rs | 2 +- pbs-client/src/chunk_stream.rs | 15 +++++-- pbs-client/src/pxar/create.rs | 8 ++++ pbs-client/src/pxar_backup_stream.rs | 40 +++++++++++-------- proxmox-backup-client/src/main.rs | 16 +++++--- .../src/proxmox_restore_daemon/api.rs | 12 +++++- pxar-bin/src/main.rs | 1 + tests/catar.rs | 1 + 8 files changed, 68 insertions(+), 27 deletions(-) diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 22dd14ce..f2963746 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None, None); + let mut chunk_stream = ChunkStream::new(stream, None, None, None); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 070a10c1..e3f0980c 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; -use pbs_datastore::{Chunker, ChunkerImpl}; +use pbs_datastore::{Chunker, ChunkerImpl, PayloadChunker}; use crate::inject_reused_chunks::InjectChunks; @@ -42,11 +42,20 @@ pub struct ChunkStream { } impl ChunkStream { - pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { + pub fn new( + input: S, + chunk_size: Option, + injection_data: Option, + suggested_boundaries: Option>, + ) -> Self { let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024); Self { input, - chunker: Box::new(ChunkerImpl::new(chunk_size)), + chunker: if let Some(suggested) = suggested_boundaries { + Box::new(PayloadChunker::new(chunk_size, suggested)) + } else { + Box::new(ChunkerImpl::new(chunk_size)) + }, buffer: BytesMut::new(), scan_pos: 0, consumed: 0, diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index eadd670d..03a6a144 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -169,6 +169,7 @@ struct Archiver { file_copy_buffer: Vec, skip_e2big_xattr: bool, forced_boundaries: Option>, + suggested_boundaries: Option>, previous_payload_index: Option, cache: PxarLookaheadCache, reuse_stats: ReuseStats, @@ -197,6 +198,7 @@ pub async fn create_archive( callback: F, options: PxarCreateOptions, forced_boundaries: Option>, + suggested_boundaries: Option>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -271,6 +273,7 @@ where file_copy_buffer: vec::undefined(4 * 1024 * 1024), skip_e2big_xattr: options.skip_e2big_xattr, forced_boundaries, + suggested_boundaries, previous_payload_index, cache: PxarLookaheadCache::new(None), reuse_stats: ReuseStats::default(), @@ -862,6 +865,11 @@ impl Archiver { .add_file(c_file_name, file_size, stat.st_mtime)?; } + if let Some(sender) = self.suggested_boundaries.as_mut() { + let offset = encoder.payload_position()?.raw(); + sender.send(offset)?; + } + let offset: LinkOffset = if let Some(payload_offset) = payload_offset { self.reuse_stats.total_reused_payload_size += file_size + size_of::() as u64; diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index fb6d063f..f322566f 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -27,6 +27,7 @@ use crate::pxar::create::PxarWriters; /// consumer. pub struct PxarBackupStream { rx: Option, Error>>>, + pub suggested_boundaries: Option>, handle: Option, error: Arc>>, } @@ -55,22 +56,26 @@ impl PxarBackupStream { )); let writer = pxar::encoder::sync::StandardWriter::new(writer); - let (writer, payload_rx) = if separate_payload_stream { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( - buffer_size, - StdChannelWriter::new(tx), - )); - ( - pxar::PxarVariant::Split( - writer, - pxar::encoder::sync::StandardWriter::new(payload_writer), - ), - Some(rx), - ) - } else { - (pxar::PxarVariant::Unified(writer), None) - }; + let (writer, payload_rx, suggested_boundaries_tx, suggested_boundaries_rx) = + if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let (suggested_boundaries_tx, suggested_boundaries_rx) = std::sync::mpsc::channel(); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + ( + pxar::PxarVariant::Split( + writer, + pxar::encoder::sync::StandardWriter::new(payload_writer), + ), + Some(rx), + Some(suggested_boundaries_tx), + Some(suggested_boundaries_rx), + ) + } else { + (pxar::PxarVariant::Unified(writer), None, None, None) + }; let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); @@ -85,6 +90,7 @@ impl PxarBackupStream { }, options, boundaries, + suggested_boundaries_tx, ) .await { @@ -99,12 +105,14 @@ impl PxarBackupStream { let backup_stream = Self { rx: Some(rx), + suggested_boundaries: None, handle: Some(handle.clone()), error: Arc::clone(&error), }; let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), + suggested_boundaries: suggested_boundaries_rx, handle: Some(handle), error, }); diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index 87083d74..99faf019 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -209,7 +209,7 @@ async fn backup_directory>( payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None, None); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -223,14 +223,19 @@ async fn backup_directory>( let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); - if let Some(payload_stream) = payload_stream { + if let Some(mut payload_stream) = payload_stream { let payload_target = payload_target .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); - let mut payload_chunk_stream = - ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); + let suggested_boundaries = payload_stream.suggested_boundaries.take(); + let mut payload_chunk_stream = ChunkStream::new( + payload_stream, + chunk_size, + Some(injection_data), + suggested_boundaries, + ); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -573,7 +578,8 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); + let catalog_chunk_stream = + ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None, None); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index 681fa6db..80af5011 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -364,8 +364,16 @@ fn extract( }; let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer)); - create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None) - .await + create_archive( + dir, + PxarWriters::new(pxar_writer, None), + Flags::DEFAULT, + |_| Ok(()), + options, + None, + None, + ) + .await } .await; if let Err(err) = result { diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 85887a8e..fa584b4e 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -442,6 +442,7 @@ async fn create_archive( }, options, None, + None, ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 9f83b4cc..94c56501 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -40,6 +40,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { |_| Ok(()), options, None, + None, ))?; Command::new("cmp")