diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 8dc3fd08..3541eddb 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -42,21 +42,37 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + let writer = pxar::encoder::sync::StandardWriter::new(writer); + + let (writer, payload_rx) = if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + ( + pxar::PxarVariant::Split( + writer, + pxar::encoder::sync::StandardWriter::new(payload_writer), + ), + Some(rx), + ) + } else { + (pxar::PxarVariant::Unified(writer), None) + }; + let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); let handler = async move { - let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( - buffer_size, - StdChannelWriter::new(tx), - )); - - let writer = - pxar::PxarVariant::Unified(pxar::encoder::sync::StandardWriter::new(writer)); if let Err(err) = crate::pxar::create_archive( dir, PxarWriters::new(writer, Some(catalog)), @@ -78,21 +94,30 @@ impl PxarBackupStream { let future = Abortable::new(handler, registration); tokio::spawn(future); - Ok(Self { + let backup_stream = Self { + rx: Some(rx), + handle: Some(handle.clone()), + error: Arc::clone(&error), + }; + + let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), handle: Some(handle), error, - }) + }); + + Ok((backup_stream, backup_payload_stream)) } pub fn open( dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options) + Self::new(dir, catalog, options, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index ad2bc5a6..25556d67 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -187,18 +187,24 @@ async fn backup_directory>( client: &BackupWriter, dir_path: P, archive_name: &str, + payload_target: Option<&str>, chunk_size: Option, catalog: Arc>>>>, pxar_create_options: pbs_client::pxar::PxarCreateOptions, upload_options: UploadOptions, -) -> Result { +) -> Result<(BackupStats, Option), Error> { if upload_options.fixed_size.is_some() { bail!("cannot backup directory with fixed chunk size!"); } - let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let (pxar_stream, payload_stream) = PxarBackupStream::open( + dir_path.as_ref(), + catalog, + pxar_create_options, + payload_target.is_some(), + )?; + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -210,11 +216,36 @@ async fn backup_directory>( } }); - let stats = client - .upload_stream(archive_name, stream, upload_options) - .await?; + let stats = client.upload_stream(archive_name, stream, upload_options.clone()); - Ok(stats) + if let Some(payload_stream) = payload_stream { + let payload_target = payload_target + .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; + + let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); + let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks + let stream = ReceiverStream::new(payload_rx).map_err(Error::from); + + // spawn payload chunker inside a separate task so that it can run parallel + tokio::spawn(async move { + while let Some(v) = payload_chunk_stream.next().await { + let _ = payload_tx.send(v).await; + } + }); + + let payload_stats = client.upload_stream(&payload_target, stream, upload_options); + + match futures::join!(stats, payload_stats) { + (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), + (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")), + (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")), + (Err(err), Err(payload_err)) => { + Err(format_err!("upload failed: {err} - {payload_err}")) + } + } + } else { + Ok((stats.await?, None)) + } } async fn backup_image>( @@ -985,6 +1016,23 @@ async fn create_backup( manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; } (BackupSpecificationType::PXAR, false) => { + let metadata_mode = false; // Until enabled via param + + let target_base = if let Some(base) = target_base.strip_suffix(".pxar") { + base.to_string() + } else { + bail!("unexpected suffix in target: {target_base}"); + }; + + let (target, payload_target) = if metadata_mode { + ( + format!("{target_base}.mpxar.{extension}"), + Some(format!("{target_base}.ppxar.{extension}")), + ) + } else { + (target, None) + }; + // start catalog upload on first use if catalog.is_none() { let catalog_upload_res = @@ -1015,16 +1063,27 @@ async fn create_backup( ..UploadOptions::default() }; - let stats = backup_directory( + let (stats, payload_stats) = backup_directory( &client, &filename, &target, + payload_target.as_deref(), chunk_size_opt, catalog.clone(), pxar_options, upload_options, ) .await?; + + if let Some(payload_stats) = payload_stats { + manifest.add_file( + payload_target + .ok_or_else(|| format_err!("missing payload target archive"))?, + payload_stats.size, + payload_stats.csum, + crypto.mode, + )?; + } manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; catalog.lock().unwrap().end_directory()?; }