client: streams: add channels for dynamic entry injection

To reuse dynamic entries of a previous backup run and index them for
the new snapshot. Adds a non-blocking channel between the pxar
archiver and the chunk stream, as well as the chunk stream and the
backup writer.

The archiver sends forced boundary positions and the dynamic
entries to inject into the chunk stream following this boundary.

The chunk stream consumes this channel inputs as receiver whenever a
new chunk is requested by the upload stream, forcing a non-regular
chunk boundary in the pxar stream at the requested positions.

The dynamic entries to inject and the boundary are then send via the
second asynchronous channel to the backup writer's upload stream,
indexing them by inserting the dynamic entries as known chunks into
the upload stream.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-02-21 11:58:14 +01:00 committed by Fabian Grünbichler
parent 77fdae28cf
commit 7de35dc243
9 changed files with 171 additions and 53 deletions

View File

@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
.map_err(Error::from); .map_err(Error::from);
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
let mut chunk_stream = ChunkStream::new(stream, None); let mut chunk_stream = ChunkStream::new(stream, None, None);
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();

View File

@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
use proxmox_human_byte::HumanByte; use proxmox_human_byte::HumanByte;
use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
use super::{H2Client, HttpClient}; use super::{H2Client, HttpClient};
@ -265,6 +266,7 @@ impl BackupWriter {
archive_name: &str, archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>, stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
options: UploadOptions, options: UploadOptions,
injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new())); let known_chunks = Arc::new(Mutex::new(HashSet::new()));
@ -341,6 +343,7 @@ impl BackupWriter {
None None
}, },
options.compress, options.compress,
injections,
) )
.await?; .await?;
@ -636,6 +639,7 @@ impl BackupWriter {
known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
compress: bool, compress: bool,
injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
) -> impl Future<Output = Result<UploadStats, Error>> { ) -> impl Future<Output = Result<UploadStats, Error>> {
let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks = Arc::new(AtomicUsize::new(0));
let total_chunks2 = total_chunks.clone(); let total_chunks2 = total_chunks.clone();
@ -662,7 +666,30 @@ impl BackupWriter {
let index_csum_2 = index_csum.clone(); let index_csum_2 = index_csum.clone();
stream stream
.and_then(move |data| { .inject_reused_chunks(injections, stream_len.clone())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
// account for injected chunks
let count = chunks.len();
total_chunks.fetch_add(count, Ordering::SeqCst);
let mut known = Vec::new();
let mut guard = index_csum.lock().unwrap();
let csum = guard.as_mut().unwrap();
for chunk in chunks {
let offset =
stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
let digest = chunk.digest();
known.push((offset, digest));
let end_offset = offset + chunk.size();
csum.update(&end_offset.to_le_bytes());
csum.update(&digest);
}
future::ok(MergedChunkInfo::Known(known))
}
InjectedChunksInfo::Raw(data) => {
// account for not injected chunks (new and known)
let chunk_len = data.len(); let chunk_len = data.len();
total_chunks.fetch_add(1, Ordering::SeqCst); total_chunks.fetch_add(1, Ordering::SeqCst);
@ -705,6 +732,7 @@ impl BackupWriter {
}) })
})) }))
} }
}
}) })
.merge_known_chunks() .merge_known_chunks()
.try_for_each(move |merged_chunk_info| { .try_for_each(move |merged_chunk_info| {

View File

@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks;
/// Holds the queues for optional injection of reused dynamic index entries /// Holds the queues for optional injection of reused dynamic index entries
pub struct InjectionData { pub struct InjectionData {
boundaries: mpsc::Receiver<InjectChunks>, boundaries: mpsc::Receiver<InjectChunks>,
next_boundary: Option<InjectChunks>,
injections: mpsc::Sender<InjectChunks>, injections: mpsc::Sender<InjectChunks>,
consumed: u64, consumed: u64,
} }
@ -25,6 +26,7 @@ impl InjectionData {
) -> Self { ) -> Self {
Self { Self {
boundaries, boundaries,
next_boundary: None,
injections, injections,
consumed: 0, consumed: 0,
} }
@ -37,15 +39,17 @@ pub struct ChunkStream<S: Unpin> {
chunker: Chunker, chunker: Chunker,
buffer: BytesMut, buffer: BytesMut,
scan_pos: usize, scan_pos: usize,
injection_data: Option<InjectionData>,
} }
impl<S: Unpin> ChunkStream<S> { impl<S: Unpin> ChunkStream<S> {
pub fn new(input: S, chunk_size: Option<usize>) -> Self { pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
Self { Self {
input, input,
chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
buffer: BytesMut::new(), buffer: BytesMut::new(),
scan_pos: 0, scan_pos: 0,
injection_data,
} }
} }
} }
@ -62,7 +66,70 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
loop { loop {
if let Some(InjectionData {
boundaries,
next_boundary,
injections,
consumed,
}) = this.injection_data.as_mut()
{
if next_boundary.is_none() {
if let Ok(boundary) = boundaries.try_recv() {
*next_boundary = Some(boundary);
}
}
if let Some(inject) = next_boundary.take() {
// require forced boundary, lookup next regular boundary
let pos = if this.scan_pos < this.buffer.len() {
this.chunker.scan(&this.buffer[this.scan_pos..])
} else {
0
};
let chunk_boundary = if pos == 0 {
*consumed + this.buffer.len() as u64
} else {
*consumed + (this.scan_pos + pos) as u64
};
if inject.boundary <= chunk_boundary {
// forced boundary is before next boundary, force within current buffer
let chunk_size = (inject.boundary - *consumed) as usize;
let raw_chunk = this.buffer.split_to(chunk_size);
this.chunker.reset();
this.scan_pos = 0;
*consumed += chunk_size as u64;
// add the size of the injected chunks to consumed, so chunk stream offsets
// are in sync with the rest of the archive.
*consumed += inject.size as u64;
injections.send(inject).unwrap();
// the chunk can be empty, return nevertheless to allow the caller to
// make progress by consuming from the injection queue
return Poll::Ready(Some(Ok(raw_chunk)));
} else if pos != 0 {
*next_boundary = Some(inject);
// forced boundary is after next boundary, split off chunk from buffer
let chunk_size = this.scan_pos + pos;
let raw_chunk = this.buffer.split_to(chunk_size);
*consumed += chunk_size as u64;
this.scan_pos = 0;
return Poll::Ready(Some(Ok(raw_chunk)));
} else {
// forced boundary is after current buffer length, continue reading
*next_boundary = Some(inject);
this.scan_pos = this.buffer.len();
}
}
}
if this.scan_pos < this.buffer.len() { if this.scan_pos < this.buffer.len() {
let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
@ -70,11 +137,14 @@ where
if boundary == 0 { if boundary == 0 {
this.scan_pos = this.buffer.len(); this.scan_pos = this.buffer.len();
// continue poll
} else if chunk_size <= this.buffer.len() { } else if chunk_size <= this.buffer.len() {
let result = this.buffer.split_to(chunk_size); // found new chunk boundary inside buffer, split off chunk from buffer
let raw_chunk = this.buffer.split_to(chunk_size);
if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
*consumed += chunk_size as u64;
}
this.scan_pos = 0; this.scan_pos = 0;
return Poll::Ready(Some(Ok(result))); return Poll::Ready(Some(Ok(raw_chunk)));
} else { } else {
panic!("got unexpected chunk boundary from chunker"); panic!("got unexpected chunk boundary from chunker");
} }

View File

@ -6,7 +6,7 @@ use std::ops::Range;
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use anyhow::{bail, Context, Error}; use anyhow::{bail, Context, Error};
use futures::future::BoxFuture; use futures::future::BoxFuture;
@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::index::IndexFile; use pbs_datastore::index::IndexFile;
use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component; use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags; use crate::pxar::Flags;
@ -134,6 +135,7 @@ struct Archiver {
hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>, hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
file_copy_buffer: Vec<u8>, file_copy_buffer: Vec<u8>,
skip_e2big_xattr: bool, skip_e2big_xattr: bool,
forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
} }
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@ -158,6 +160,7 @@ pub async fn create_archive<T, F>(
feature_flags: Flags, feature_flags: Flags,
callback: F, callback: F,
options: PxarCreateOptions, options: PxarCreateOptions,
forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
) -> Result<(), Error> ) -> Result<(), Error>
where where
T: SeqWrite + Send, T: SeqWrite + Send,
@ -213,6 +216,7 @@ where
hardlinks: HashMap::new(), hardlinks: HashMap::new(),
file_copy_buffer: vec::undefined(4 * 1024 * 1024), file_copy_buffer: vec::undefined(4 * 1024 * 1024),
skip_e2big_xattr: options.skip_e2big_xattr, skip_e2big_xattr: options.skip_e2big_xattr,
forced_boundaries,
}; };
archiver archiver

View File

@ -2,7 +2,7 @@ use std::io::Write;
//use std::os::unix::io::FromRawFd; //use std::os::unix::io::FromRawFd;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter;
use pbs_datastore::catalog::CatalogWriter; use pbs_datastore::catalog::CatalogWriter;
use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::create::PxarWriters; use crate::pxar::create::PxarWriters;
/// Stream implementation to encode and upload .pxar archives. /// Stream implementation to encode and upload .pxar archives.
@ -42,6 +43,7 @@ impl PxarBackupStream {
dir: Dir, dir: Dir,
catalog: Arc<Mutex<CatalogWriter<W>>>, catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions, options: crate::pxar::PxarCreateOptions,
boundaries: Option<mpsc::Sender<InjectChunks>>,
separate_payload_stream: bool, separate_payload_stream: bool,
) -> Result<(Self, Option<Self>), Error> { ) -> Result<(Self, Option<Self>), Error> {
let buffer_size = 256 * 1024; let buffer_size = 256 * 1024;
@ -82,6 +84,7 @@ impl PxarBackupStream {
Ok(()) Ok(())
}, },
options, options,
boundaries,
) )
.await .await
{ {
@ -113,11 +116,12 @@ impl PxarBackupStream {
dirname: &Path, dirname: &Path,
catalog: Arc<Mutex<CatalogWriter<W>>>, catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions, options: crate::pxar::PxarCreateOptions,
boundaries: Option<mpsc::Sender<InjectChunks>>,
separate_payload_stream: bool, separate_payload_stream: bool,
) -> Result<(Self, Option<Self>), Error> { ) -> Result<(Self, Option<Self>), Error> {
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
Self::new(dir, catalog, options, separate_payload_stream) Self::new(dir, catalog, options, boundaries, separate_payload_stream)
} }
} }

View File

@ -45,8 +45,8 @@ use pbs_client::tools::{
use pbs_client::{ use pbs_client::{
delete_ticket_info, parse_backup_specification, view_task_result, BackupReader, delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream, BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions, FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
BACKUP_SOURCE_SCHEMA, UploadOptions, BACKUP_SOURCE_SCHEMA,
}; };
use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter}; use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
use pbs_datastore::chunk_store::verify_chunk_size; use pbs_datastore::chunk_store::verify_chunk_size;
@ -199,14 +199,16 @@ async fn backup_directory<P: AsRef<Path>>(
bail!("cannot backup directory with fixed chunk size!"); bail!("cannot backup directory with fixed chunk size!");
} }
let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel();
let (pxar_stream, payload_stream) = PxarBackupStream::open( let (pxar_stream, payload_stream) = PxarBackupStream::open(
dir_path.as_ref(), dir_path.as_ref(),
catalog, catalog,
pxar_create_options, pxar_create_options,
Some(payload_boundaries_tx),
payload_target.is_some(), payload_target.is_some(),
)?; )?;
let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = ReceiverStream::new(rx).map_err(Error::from); let stream = ReceiverStream::new(rx).map_err(Error::from);
@ -218,13 +220,16 @@ async fn backup_directory<P: AsRef<Path>>(
} }
}); });
let stats = client.upload_stream(archive_name, stream, upload_options.clone()); let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
if let Some(payload_stream) = payload_stream { if let Some(payload_stream) = payload_stream {
let payload_target = payload_target let payload_target = payload_target
.ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
let mut payload_chunk_stream =
ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = ReceiverStream::new(payload_rx).map_err(Error::from); let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
@ -235,7 +240,12 @@ async fn backup_directory<P: AsRef<Path>>(
} }
}); });
let payload_stats = client.upload_stream(&payload_target, stream, upload_options); let payload_stats = client.upload_stream(
&payload_target,
stream,
upload_options,
Some(payload_injections_rx),
);
match futures::join!(stats, payload_stats) { match futures::join!(stats, payload_stats) {
(Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
@ -271,7 +281,7 @@ async fn backup_image<P: AsRef<Path>>(
} }
let stats = client let stats = client
.upload_stream(archive_name, stream, upload_options) .upload_stream(archive_name, stream, upload_options, None)
.await?; .await?;
Ok(stats) Ok(stats)
@ -562,7 +572,7 @@ fn spawn_catalog_upload(
let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
let catalog_chunk_size = 512 * 1024; let catalog_chunk_size = 512 * 1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
StdChannelWriter::new(catalog_tx), StdChannelWriter::new(catalog_tx),
@ -578,7 +588,7 @@ fn spawn_catalog_upload(
tokio::spawn(async move { tokio::spawn(async move {
let catalog_upload_result = client let catalog_upload_result = client
.upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
.await; .await;
if let Err(ref err) = catalog_upload_result { if let Err(ref err) = catalog_upload_result {

View File

@ -363,7 +363,7 @@ fn extract(
}; };
let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer)); let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer));
create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options) create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None)
.await .await
} }
.await; .await;

View File

@ -409,6 +409,7 @@ async fn create_archive(
Ok(()) Ok(())
}, },
options, options,
None,
) )
.await?; .await?;

View File

@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
Flags::DEFAULT, Flags::DEFAULT,
|_| Ok(()), |_| Ok(()),
options, options,
None,
))?; ))?;
Command::new("cmp") Command::new("cmp")