diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 3f69b436..22dd14ce 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None); + let mut chunk_stream = ChunkStream::new(stream, None, None); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index dc9aa569..b2ada85c 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -265,6 +266,7 @@ impl BackupWriter { archive_name: &str, stream: impl Stream>, options: UploadOptions, + injections: Option>, ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -341,6 +343,7 @@ impl BackupWriter { None }, options.compress, + injections, ) .await?; @@ -636,6 +639,7 @@ impl BackupWriter { known_chunks: Arc>>, crypt_config: Option>, compress: bool, + injections: Option>, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -662,48 +666,72 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); stream - .and_then(move |data| { - let chunk_len = data.len(); + .inject_reused_chunks(injections, stream_len.clone()) + .and_then(move |chunk_info| match chunk_info { + InjectedChunksInfo::Known(chunks) => { + // account for injected chunks + let count = chunks.len(); + total_chunks.fetch_add(count, Ordering::SeqCst); - total_chunks.fetch_add(1, Ordering::SeqCst); - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; - - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); - - if let Some(ref crypt_config) = crypt_config { - chunk_builder = chunk_builder.crypt_config(crypt_config); + let mut known = Vec::new(); + let mut guard = index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); + for chunk in chunks { + let offset = + stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64; + reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst); + let digest = chunk.digest(); + known.push((offset, digest)); + let end_offset = offset + chunk.size(); + csum.update(&end_offset.to_le_bytes()); + csum.update(&digest); + } + future::ok(MergedChunkInfo::Known(known)) } + InjectedChunksInfo::Raw(data) => { + // account for not injected chunks (new and known) + let chunk_len = data.len(); - let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + total_chunks.fetch_add(1, Ordering::SeqCst); + let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; - let mut guard = index_csum.lock().unwrap(); - let csum = guard.as_mut().unwrap(); + let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); - let chunk_end = offset + chunk_len as u64; + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } - if !is_fixed_chunk_size { - csum.update(&chunk_end.to_le_bytes()); - } - csum.update(digest); + let mut known_chunks = known_chunks.lock().unwrap(); + let digest = chunk_builder.digest(); - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - known_chunk_count.fetch_add(1, Ordering::SeqCst); - reused_len.fetch_add(chunk_len, Ordering::SeqCst); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let compressed_stream_len2 = compressed_stream_len.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) + let mut guard = index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); + + let chunk_end = offset + chunk_len as u64; + + if !is_fixed_chunk_size { + csum.update(&chunk_end.to_le_bytes()); + } + csum.update(digest); + + let chunk_is_known = known_chunks.contains(digest); + if chunk_is_known { + known_chunk_count.fetch_add(1, Ordering::SeqCst); + reused_len.fetch_add(chunk_len, Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + } else { + let compressed_stream_len2 = compressed_stream_len.clone(); + known_chunks.insert(*digest); + future::ready(chunk_builder.build().map(move |(chunk, digest)| { + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len: chunk_len as u64, + offset, + }) + })) + } } }) .merge_known_chunks() diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 83c75ba2..87a018d5 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks; /// Holds the queues for optional injection of reused dynamic index entries pub struct InjectionData { boundaries: mpsc::Receiver, + next_boundary: Option, injections: mpsc::Sender, consumed: u64, } @@ -25,6 +26,7 @@ impl InjectionData { ) -> Self { Self { boundaries, + next_boundary: None, injections, consumed: 0, } @@ -37,15 +39,17 @@ pub struct ChunkStream { chunker: Chunker, buffer: BytesMut, scan_pos: usize, + injection_data: Option, } impl ChunkStream { - pub fn new(input: S, chunk_size: Option) -> Self { + pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), buffer: BytesMut::new(), scan_pos: 0, + injection_data, } } } @@ -62,7 +66,70 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); + loop { + if let Some(InjectionData { + boundaries, + next_boundary, + injections, + consumed, + }) = this.injection_data.as_mut() + { + if next_boundary.is_none() { + if let Ok(boundary) = boundaries.try_recv() { + *next_boundary = Some(boundary); + } + } + + if let Some(inject) = next_boundary.take() { + // require forced boundary, lookup next regular boundary + let pos = if this.scan_pos < this.buffer.len() { + this.chunker.scan(&this.buffer[this.scan_pos..]) + } else { + 0 + }; + + let chunk_boundary = if pos == 0 { + *consumed + this.buffer.len() as u64 + } else { + *consumed + (this.scan_pos + pos) as u64 + }; + + if inject.boundary <= chunk_boundary { + // forced boundary is before next boundary, force within current buffer + let chunk_size = (inject.boundary - *consumed) as usize; + let raw_chunk = this.buffer.split_to(chunk_size); + this.chunker.reset(); + this.scan_pos = 0; + + *consumed += chunk_size as u64; + + // add the size of the injected chunks to consumed, so chunk stream offsets + // are in sync with the rest of the archive. + *consumed += inject.size as u64; + + injections.send(inject).unwrap(); + + // the chunk can be empty, return nevertheless to allow the caller to + // make progress by consuming from the injection queue + return Poll::Ready(Some(Ok(raw_chunk))); + } else if pos != 0 { + *next_boundary = Some(inject); + // forced boundary is after next boundary, split off chunk from buffer + let chunk_size = this.scan_pos + pos; + let raw_chunk = this.buffer.split_to(chunk_size); + *consumed += chunk_size as u64; + this.scan_pos = 0; + + return Poll::Ready(Some(Ok(raw_chunk))); + } else { + // forced boundary is after current buffer length, continue reading + *next_boundary = Some(inject); + this.scan_pos = this.buffer.len(); + } + } + } + if this.scan_pos < this.buffer.len() { let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); @@ -70,11 +137,14 @@ where if boundary == 0 { this.scan_pos = this.buffer.len(); - // continue poll } else if chunk_size <= this.buffer.len() { - let result = this.buffer.split_to(chunk_size); + // found new chunk boundary inside buffer, split off chunk from buffer + let raw_chunk = this.buffer.split_to(chunk_size); + if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() { + *consumed += chunk_size as u64; + } this.scan_pos = 0; - return Poll::Ready(Some(Ok(result))); + return Poll::Ready(Some(Ok(raw_chunk))); } else { panic!("got unexpected chunk boundary from chunker"); } diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index bcf4fb32..b4ea2ae4 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -6,7 +6,7 @@ use std::ops::Range; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use anyhow::{bail, Context, Error}; use futures::future::BoxFuture; @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::index::IndexFile; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; @@ -134,6 +135,7 @@ struct Archiver { hardlinks: HashMap, file_copy_buffer: Vec, skip_e2big_xattr: bool, + forced_boundaries: Option>, } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -158,6 +160,7 @@ pub async fn create_archive( feature_flags: Flags, callback: F, options: PxarCreateOptions, + forced_boundaries: Option>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -213,6 +216,7 @@ where hardlinks: HashMap::new(), file_copy_buffer: vec::undefined(4 * 1024 * 1024), skip_e2big_xattr: options.skip_e2big_xattr, + forced_boundaries, }; archiver diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 3541eddb..fb6d063f 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -2,7 +2,7 @@ use std::io::Write; //use std::os::unix::io::FromRawFd; use std::path::Path; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll}; use anyhow::{format_err, Error}; @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter; use pbs_datastore::catalog::CatalogWriter; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::create::PxarWriters; /// Stream implementation to encode and upload .pxar archives. @@ -42,6 +43,7 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Option>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; @@ -82,6 +84,7 @@ impl PxarBackupStream { Ok(()) }, options, + boundaries, ) .await { @@ -113,11 +116,12 @@ impl PxarBackupStream { dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Option>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options, separate_payload_stream) + Self::new(dir, catalog, options, boundaries, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index dcff1196..f4a545d3 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -45,8 +45,8 @@ use pbs_client::tools::{ use pbs_client::{ delete_ticket_info, parse_backup_specification, view_task_result, BackupReader, BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream, - FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions, - BACKUP_SOURCE_SCHEMA, + FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader, + UploadOptions, BACKUP_SOURCE_SCHEMA, }; use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter}; use pbs_datastore::chunk_store::verify_chunk_size; @@ -199,14 +199,16 @@ async fn backup_directory>( bail!("cannot backup directory with fixed chunk size!"); } + let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel(); let (pxar_stream, payload_stream) = PxarBackupStream::open( dir_path.as_ref(), catalog, pxar_create_options, + Some(payload_boundaries_tx), payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -218,13 +220,16 @@ async fn backup_directory>( } }); - let stats = client.upload_stream(archive_name, stream, upload_options.clone()); + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); if let Some(payload_stream) = payload_stream { let payload_target = payload_target .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; - let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); + let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); + let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); + let mut payload_chunk_stream = + ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -235,7 +240,12 @@ async fn backup_directory>( } }); - let payload_stats = client.upload_stream(&payload_target, stream, upload_options); + let payload_stats = client.upload_stream( + &payload_target, + stream, + upload_options, + Some(payload_injections_rx), + ); match futures::join!(stats, payload_stats) { (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), @@ -271,7 +281,7 @@ async fn backup_image>( } let stats = client - .upload_stream(archive_name, stream, upload_options) + .upload_stream(archive_name, stream, upload_options, None) .await?; Ok(stats) @@ -562,7 +572,7 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); + let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), @@ -578,7 +588,7 @@ fn spawn_catalog_upload( tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None) .await; if let Err(ref err) = catalog_upload_result { diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index 95c9f461..f7fbae09 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -363,7 +363,7 @@ fn extract( }; let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer)); - create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options) + create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None) .await } .await; diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index b4c8f062..a9a5fccd 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -409,6 +409,7 @@ async fn create_archive( Ok(()) }, options, + None, ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 932df61a..9f83b4cc 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { Flags::DEFAULT, |_| Ok(()), options, + None, ))?; Command::new("cmp")