diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index d126b277..6e6ce1a2 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; use proxmox_sys::error::SysError; use pxar::accessor::aio::{Accessor, Directory}; use pxar::accessor::ReadAt; -use pxar::encoder::{LinkOffset, SeqWrite}; +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite}; use pxar::{EntryKind, Metadata, PxarVariant}; +use proxmox_human_byte::HumanByte; use proxmox_io::vec; use proxmox_lang::c_str; use proxmox_sys::fs::{self, acl, xattr}; @@ -33,10 +34,13 @@ use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::index::IndexFile; use crate::inject_reused_chunks::InjectChunks; +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData, PxarLookaheadCache}; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; +const CHUNK_PADDING_THRESHOLD: f64 = 0.1; + /// Pxar options for creating a pxar archive/stream #[derive(Default)] pub struct PxarCreateOptions { @@ -154,6 +158,7 @@ struct Archiver { skip_e2big_xattr: bool, forced_boundaries: Option>, previous_payload_index: Option, + cache: PxarLookaheadCache, } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -207,6 +212,7 @@ where set.insert(stat.st_dev); } + let metadata_mode = options.previous_ref.is_some() && writers.archive.payload().is_some(); let mut encoder = Encoder::new(writers.archive, &metadata).await?; let mut patterns = options.patterns; @@ -245,11 +251,19 @@ where skip_e2big_xattr: options.skip_e2big_xattr, forced_boundaries, previous_payload_index, + cache: PxarLookaheadCache::new(None), }; archiver .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true) .await?; + + if metadata_mode { + archiver + .flush_cached_reusing_if_below_threshold(&mut encoder, false) + .await?; + } + encoder.finish().await?; encoder.close().await?; @@ -307,7 +321,10 @@ impl Archiver { for file_entry in file_list { let file_name = file_entry.name.to_bytes(); - if is_root && file_name == b".pxarexclude-cli" { + if is_root + && file_name == b".pxarexclude-cli" + && previous_metadata_accessor.is_none() + { self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count) .await?; continue; @@ -610,8 +627,6 @@ impl Archiver { c_file_name: &CStr, stat: &FileStat, ) -> Result<(), Error> { - use pxar::format::mode; - let file_mode = stat.st_mode & libc::S_IFMT; let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR { OFlag::empty() @@ -649,6 +664,126 @@ impl Archiver { self.skip_e2big_xattr, )?; + if self.previous_payload_index.is_none() { + return self + .add_entry_to_archive(encoder, &mut None, c_file_name, stat, fd, &metadata, None) + .await; + } + + // Avoid having to many open file handles in cached entries + if self.cache.is_full() { + log::debug!("Max cache size reached, reuse cached entries"); + self.flush_cached_reusing_if_below_threshold(encoder, true) + .await?; + } + + if metadata.is_regular_file() { + if stat.st_nlink > 1 { + let link_info = HardLinkInfo { + st_dev: stat.st_dev, + st_ino: stat.st_ino, + }; + if self.cache.contains_hardlink(&link_info) { + // This hardlink has been seen by the lookahead cache already, put it on the cache + // with a dummy offset and continue without lookup and chunk injection. + // On flushing or re-encoding, the logic there will store the actual hardlink with + // offset. + self.cache.insert( + fd, + c_file_name.into(), + *stat, + metadata.clone(), + PayloadOffset::default(), + self.path.clone(), + ); + return Ok(()); + } else { + // mark this hardlink as seen by the lookahead cache + self.cache.insert_hardlink(link_info); + } + } + + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); + if let Some(payload_range) = self + .is_reusable_entry(previous_metadata, file_name, &metadata) + .await? + { + if !self.cache.try_extend_range(payload_range.clone()) { + log::debug!("Cache range has hole, new range: {payload_range:?}"); + self.flush_cached_reusing_if_below_threshold(encoder, true) + .await?; + // range has to be set after flushing of cached entries, which resets the range + self.cache.update_range(payload_range.clone()); + } + + // offset relative to start of current range, does not include possible padding of + // actual chunks, which needs to be added before encoding the payload reference + let offset = + PayloadOffset::default().add(payload_range.start - self.cache.range().start); + log::debug!("Offset relative to range start: {offset:?}"); + + self.cache.insert( + fd, + c_file_name.into(), + *stat, + metadata.clone(), + offset, + self.path.clone(), + ); + return Ok(()); + } else { + self.flush_cached_reusing_if_below_threshold(encoder, false) + .await?; + } + } else if self.cache.caching_enabled() { + self.cache.insert( + fd.try_clone()?, + c_file_name.into(), + *stat, + metadata.clone(), + PayloadOffset::default(), + self.path.clone(), + ); + + if metadata.is_dir() { + self.add_directory( + encoder, + previous_metadata, + Dir::from_fd(fd.into_raw_fd())?, + c_file_name, + &metadata, + stat, + ) + .await?; + } + return Ok(()); + } + + self.encode_entries_to_archive(encoder, None).await?; + self.add_entry_to_archive( + encoder, + previous_metadata, + c_file_name, + stat, + fd, + &metadata, + None, + ) + .await + } + + async fn add_entry_to_archive( + &mut self, + encoder: &mut Encoder<'_, T>, + previous_metadata: &mut Option>, + c_file_name: &CStr, + stat: &FileStat, + fd: OwnedFd, + metadata: &Metadata, + payload_offset: Option, + ) -> Result<(), Error> { + use pxar::format::mode; + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); match metadata.file_type() { mode::IFREG => { @@ -677,9 +812,14 @@ impl Archiver { .add_file(c_file_name, file_size, stat.st_mtime)?; } - let offset: LinkOffset = self - .add_regular_file(encoder, fd, file_name, &metadata, file_size) - .await?; + let offset: LinkOffset = if let Some(payload_offset) = payload_offset { + encoder + .add_payload_ref(metadata, file_name, file_size, payload_offset) + .await? + } else { + self.add_regular_file(encoder, fd, file_name, metadata, file_size) + .await? + }; if stat.st_nlink > 1 { self.hardlinks @@ -690,50 +830,43 @@ impl Archiver { } mode::IFDIR => { let dir = Dir::from_fd(fd.into_raw_fd())?; - self.add_directory( - encoder, - previous_metadata, - dir, - c_file_name, - &metadata, - stat, - ) - .await + self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat) + .await } mode::IFSOCK => { if let Some(ref catalog) = self.catalog { catalog.lock().unwrap().add_socket(c_file_name)?; } - Ok(encoder.add_socket(&metadata, file_name).await?) + Ok(encoder.add_socket(metadata, file_name).await?) } mode::IFIFO => { if let Some(ref catalog) = self.catalog { catalog.lock().unwrap().add_fifo(c_file_name)?; } - Ok(encoder.add_fifo(&metadata, file_name).await?) + Ok(encoder.add_fifo(metadata, file_name).await?) } mode::IFLNK => { if let Some(ref catalog) = self.catalog { catalog.lock().unwrap().add_symlink(c_file_name)?; } - self.add_symlink(encoder, fd, file_name, &metadata).await + self.add_symlink(encoder, fd, file_name, metadata).await } mode::IFBLK => { if let Some(ref catalog) = self.catalog { catalog.lock().unwrap().add_block_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, stat).await + self.add_device(encoder, file_name, metadata, stat).await } mode::IFCHR => { if let Some(ref catalog) = self.catalog { catalog.lock().unwrap().add_char_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, stat).await + self.add_device(encoder, file_name, metadata, stat).await } other => bail!( "encountered unknown file type: 0x{:x} (0o{:o})", @@ -743,6 +876,197 @@ impl Archiver { } } + async fn flush_cached_reusing_if_below_threshold( + &mut self, + encoder: &mut Encoder<'_, T>, + keep_last_chunk: bool, + ) -> Result<(), Error> { + if self.cache.range().is_empty() { + // only non regular file entries (e.g. directories) in cache, allows to do regular encoding + self.encode_entries_to_archive(encoder, None).await?; + return Ok(()); + } + + if let Some(ref ref_payload_index) = self.previous_payload_index { + // Take ownership of previous last chunk, only update where it must be injected + let prev_last_chunk = self.cache.take_last_chunk(); + let range = self.cache.range(); + let (mut indices, start_padding, end_padding) = + lookup_dynamic_entries(ref_payload_index, range)?; + let mut padding = start_padding + end_padding; + let total_size = (range.end - range.start) + padding; + + // take into account used bytes of kept back chunk for padding + if let (Some(first), Some(last)) = (indices.first(), prev_last_chunk.as_ref()) { + if last.digest() == first.digest() { + // Update padding used for threshold calculation only + let used = last.size() - last.padding; + padding -= used; + } + } + + let ratio = padding as f64 / total_size as f64; + + // do not reuse chunks if introduced padding higher than threshold + // opt for re-encoding in that case + if ratio > CHUNK_PADDING_THRESHOLD { + log::debug!( + "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}", + HumanByte::from(padding), + HumanByte::from(total_size), + indices.len(), + ); + self.cache.update_last_chunk(prev_last_chunk); + self.encode_entries_to_archive(encoder, None).await?; + } else { + log::debug!( + "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}", + HumanByte::from(padding), + HumanByte::from(total_size), + indices.len(), + ); + + // check for cases where kept back last is not equal first chunk because the range + // end aligned with a chunk boundary, and the chunks therefore needs to be injected + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) { + if last.digest() != first.digest() { + // make sure to inject previous last chunk before encoding entries + self.inject_chunks_at_current_payload_position(encoder, vec![last])?; + } else { + let used = last.size() - last.padding; + first.padding -= used; + } + } + + let base_offset = Some(encoder.payload_position()?.add(start_padding)); + self.encode_entries_to_archive(encoder, base_offset).await?; + + if keep_last_chunk { + self.cache.update_last_chunk(indices.pop()); + } + + self.inject_chunks_at_current_payload_position(encoder, indices)?; + } + + Ok(()) + } else { + bail!("cannot reuse chunks without previous index reader"); + } + } + + // Take ownership of cached entries and encode them to the archive + // Encode with reused payload chunks when base offset is some, reencode otherwise + async fn encode_entries_to_archive( + &mut self, + encoder: &mut Encoder<'_, T>, + base_offset: Option, + ) -> Result<(), Error> { + if let Some(prev) = self.cache.take_last_chunk() { + // make sure to inject previous last chunk before encoding entries + self.inject_chunks_at_current_payload_position(encoder, vec![prev])?; + } + + // take ownership of cached entries and reset caching state + let (entries, start_path) = self.cache.take_and_reset(); + let old_path = self.path.clone(); + self.path = start_path; + log::debug!( + "Got {} cache entries to encode: reuse is {}", + entries.len(), + base_offset.is_some() + ); + + for entry in entries { + match entry { + CacheEntry::RegEntry(CacheEntryData { + fd, + c_file_name, + stat, + metadata, + payload_offset, + }) => { + let file_name = OsStr::from_bytes(c_file_name.to_bytes()); + self.path.push(file_name); + self.add_entry_to_archive( + encoder, + &mut None, + &c_file_name, + &stat, + fd, + &metadata, + base_offset.map(|base_offset| payload_offset.add(base_offset.raw())), + ) + .await?; + self.path.pop(); + } + CacheEntry::DirEntry(CacheEntryData { + c_file_name, + metadata, + .. + }) => { + let file_name = OsStr::from_bytes(c_file_name.to_bytes()); + self.path.push(file_name); + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(&c_file_name)?; + } + let dir_name = OsStr::from_bytes(c_file_name.to_bytes()); + encoder.create_directory(dir_name, &metadata).await?; + } + CacheEntry::DirEnd => { + encoder.finish().await?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; + } + self.path.pop(); + } + } + } + + self.path = old_path; + + Ok(()) + } + + fn inject_chunks_at_current_payload_position( + &mut self, + encoder: &mut Encoder<'_, T>, + reused_chunks: Vec, + ) -> Result<(), Error> { + let mut injection_boundary = encoder.payload_position()?; + + for chunks in reused_chunks.chunks(128) { + let chunks = chunks.to_vec(); + let mut size = PayloadOffset::default(); + + for chunk in chunks.iter() { + log::debug!( + "Injecting chunk with {} padding (chunk size {})", + HumanByte::from(chunk.padding), + HumanByte::from(chunk.size()), + ); + size = size.add(chunk.size()); + } + + let inject_chunks = InjectChunks { + boundary: injection_boundary.raw(), + chunks, + size: size.raw() as usize, + }; + + if let Some(sender) = self.forced_boundaries.as_mut() { + sender.send(inject_chunks)?; + } else { + bail!("missing injection queue"); + }; + + injection_boundary = injection_boundary.add(size.raw()); + log::debug!("Advance payload position by: {size:?}"); + encoder.advance(size)?; + } + + Ok(()) + } + async fn add_directory( &mut self, encoder: &mut Encoder<'_, T>, @@ -754,10 +1078,12 @@ impl Archiver { ) -> Result<(), Error> { let dir_name = OsStr::from_bytes(c_dir_name.to_bytes()); - if let Some(ref catalog) = self.catalog { - catalog.lock().unwrap().start_directory(c_dir_name)?; + if !self.cache.caching_enabled() { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(c_dir_name)?; + } + encoder.create_directory(dir_name, metadata).await?; } - encoder.create_directory(dir_name, metadata).await?; let old_fs_magic = self.fs_magic; let old_fs_feature_flags = self.fs_feature_flags; @@ -797,9 +1123,13 @@ impl Archiver { self.fs_feature_flags = old_fs_feature_flags; self.current_st_dev = old_st_dev; - encoder.finish().await?; - if let Some(ref catalog) = self.catalog { - catalog.lock().unwrap().end_directory()?; + if !self.cache.caching_enabled() { + encoder.finish().await?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; + } + } else { + self.cache.insert_dir_end(); } result