diff --git a/examples/test_chunk_size.rs b/examples/test_chunk_size.rs index a01a5e64..2ebc22f6 100644 --- a/examples/test_chunk_size.rs +++ b/examples/test_chunk_size.rs @@ -5,10 +5,10 @@ extern crate proxmox_backup; use anyhow::Error; use std::io::{Read, Write}; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; struct ChunkWriter { - chunker: Chunker, + chunker: ChunkerImpl, last_chunk: usize, chunk_offset: usize, @@ -23,7 +23,7 @@ struct ChunkWriter { impl ChunkWriter { fn new(chunk_size: usize) -> Self { ChunkWriter { - chunker: Chunker::new(chunk_size), + chunker: ChunkerImpl::new(chunk_size), last_chunk: 0, chunk_offset: 0, chunk_count: 0, @@ -69,7 +69,8 @@ impl Write for ChunkWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { let chunker = &mut self.chunker; - let pos = chunker.scan(data); + let ctx = pbs_datastore::chunker::Context::default(); + let pos = chunker.scan(data, &ctx); if pos > 0 { self.chunk_offset += pos; diff --git a/examples/test_chunk_speed.rs b/examples/test_chunk_speed.rs index 37e13e0d..2d79604a 100644 --- a/examples/test_chunk_speed.rs +++ b/examples/test_chunk_speed.rs @@ -1,6 +1,6 @@ extern crate proxmox_backup; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; fn main() { let mut buffer = Vec::new(); @@ -12,7 +12,7 @@ fn main() { buffer.push(byte); } } - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let count = 5; @@ -23,8 +23,9 @@ fn main() { for _i in 0..count { let mut pos = 0; let mut _last = 0; + let ctx = pbs_datastore::chunker::Context::default(); while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..]); + let k = chunker.scan(&buffer[pos..], &ctx); if k == 0 { //println!("LAST {}", pos); break; diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 87a018d5..84158a2c 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; use crate::inject_reused_chunks::InjectChunks; @@ -16,7 +16,6 @@ pub struct InjectionData { boundaries: mpsc::Receiver, next_boundary: Option, injections: mpsc::Sender, - consumed: u64, } impl InjectionData { @@ -28,7 +27,6 @@ impl InjectionData { boundaries, next_boundary: None, injections, - consumed: 0, } } } @@ -36,19 +34,22 @@ impl InjectionData { /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, - chunker: Chunker, + chunker: Box, buffer: BytesMut, scan_pos: usize, + consumed: u64, injection_data: Option, } impl ChunkStream { pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { + let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024); Self { input, - chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), + chunker: Box::new(ChunkerImpl::new(chunk_size)), buffer: BytesMut::new(), scan_pos: 0, + consumed: 0, injection_data, } } @@ -68,11 +69,15 @@ where let this = self.get_mut(); loop { + let ctx = pbs_datastore::chunker::Context { + base: this.consumed, + total: this.buffer.len() as u64, + }; + if let Some(InjectionData { boundaries, next_boundary, injections, - consumed, }) = this.injection_data.as_mut() { if next_boundary.is_none() { @@ -84,29 +89,29 @@ where if let Some(inject) = next_boundary.take() { // require forced boundary, lookup next regular boundary let pos = if this.scan_pos < this.buffer.len() { - this.chunker.scan(&this.buffer[this.scan_pos..]) + this.chunker.scan(&this.buffer[this.scan_pos..], &ctx) } else { 0 }; let chunk_boundary = if pos == 0 { - *consumed + this.buffer.len() as u64 + this.consumed + this.buffer.len() as u64 } else { - *consumed + (this.scan_pos + pos) as u64 + this.consumed + (this.scan_pos + pos) as u64 }; if inject.boundary <= chunk_boundary { // forced boundary is before next boundary, force within current buffer - let chunk_size = (inject.boundary - *consumed) as usize; + let chunk_size = (inject.boundary - this.consumed) as usize; let raw_chunk = this.buffer.split_to(chunk_size); this.chunker.reset(); this.scan_pos = 0; - *consumed += chunk_size as u64; + this.consumed += chunk_size as u64; // add the size of the injected chunks to consumed, so chunk stream offsets // are in sync with the rest of the archive. - *consumed += inject.size as u64; + this.consumed += inject.size as u64; injections.send(inject).unwrap(); @@ -118,7 +123,7 @@ where // forced boundary is after next boundary, split off chunk from buffer let chunk_size = this.scan_pos + pos; let raw_chunk = this.buffer.split_to(chunk_size); - *consumed += chunk_size as u64; + this.consumed += chunk_size as u64; this.scan_pos = 0; return Poll::Ready(Some(Ok(raw_chunk))); @@ -131,7 +136,7 @@ where } if this.scan_pos < this.buffer.len() { - let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); + let boundary = this.chunker.scan(&this.buffer[this.scan_pos..], &ctx); let chunk_size = this.scan_pos + boundary; @@ -140,9 +145,7 @@ where } else if chunk_size <= this.buffer.len() { // found new chunk boundary inside buffer, split off chunk from buffer let raw_chunk = this.buffer.split_to(chunk_size); - if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() { - *consumed += chunk_size as u64; - } + this.consumed += chunk_size as u64; this.scan_pos = 0; return Poll::Ready(Some(Ok(raw_chunk))); } else { diff --git a/pbs-datastore/src/chunker.rs b/pbs-datastore/src/chunker.rs index 253d2cf4..d75e63fa 100644 --- a/pbs-datastore/src/chunker.rs +++ b/pbs-datastore/src/chunker.rs @@ -5,6 +5,20 @@ /// use hash value 0 to detect a boundary. const CA_CHUNKER_WINDOW_SIZE: usize = 64; +/// Additional context for chunker to find possible boundaries in payload streams +#[derive(Default)] +pub struct Context { + /// Already consumed bytes of the chunk stream consumer + pub base: u64, + /// Total size currently buffered + pub total: u64, +} + +pub trait Chunker { + fn scan(&mut self, data: &[u8], ctx: &Context) -> usize; + fn reset(&mut self); +} + /// Sliding window chunker (Buzhash) /// /// This is a rewrite of *casync* chunker (cachunker.h) in rust. @@ -15,7 +29,7 @@ const CA_CHUNKER_WINDOW_SIZE: usize = 64; /// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from /// Wikipedia. -pub struct Chunker { +pub struct ChunkerImpl { h: u32, window_size: usize, chunk_size: usize, @@ -67,7 +81,7 @@ const BUZHASH_TABLE: [u32; 256] = [ 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11, ]; -impl Chunker { +impl ChunkerImpl { /// Create a new Chunker instance, which produces and average /// chunk size of `chunk_size_avg` (need to be a power of two). We /// allow variation from `chunk_size_avg/4` up to a maximum of @@ -105,11 +119,44 @@ impl Chunker { } } + // fast implementation avoiding modulo + // #[inline(always)] + fn shall_break(&self) -> bool { + if self.chunk_size >= self.chunk_size_max { + return true; + } + + if self.chunk_size < self.chunk_size_min { + return false; + } + + //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!! + + //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams + + (self.h & self.break_test_mask) >= self.break_test_minimum + } + + // This is the original implementation from casync + /* + #[inline(always)] + fn shall_break_orig(&self) -> bool { + + if self.chunk_size >= self.chunk_size_max { return true; } + + if self.chunk_size < self.chunk_size_min { return false; } + + (self.h % self.discriminator) == (self.discriminator - 1) + } + */ +} + +impl Chunker for ChunkerImpl { /// Scans the specified data for a chunk border. Returns 0 if none /// was found (and the function should be called with more data /// later on), or another value indicating the position of a /// border. - pub fn scan(&mut self, data: &[u8]) -> usize { + fn scan(&mut self, data: &[u8], _ctx: &Context) -> usize { let window_len = self.window.len(); let data_len = data.len(); @@ -167,42 +214,11 @@ impl Chunker { 0 } - pub fn reset(&mut self) { + fn reset(&mut self) { self.h = 0; self.chunk_size = 0; self.window_size = 0; } - - // fast implementation avoiding modulo - // #[inline(always)] - fn shall_break(&self) -> bool { - if self.chunk_size >= self.chunk_size_max { - return true; - } - - if self.chunk_size < self.chunk_size_min { - return false; - } - - //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!! - - //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams - - (self.h & self.break_test_mask) >= self.break_test_minimum - } - - // This is the original implementation from casync - /* - #[inline(always)] - fn shall_break_orig(&self) -> bool { - - if self.chunk_size >= self.chunk_size_max { return true; } - - if self.chunk_size < self.chunk_size_min { return false; } - - (self.h % self.discriminator) == (self.discriminator - 1) - } - */ } #[test] @@ -215,17 +231,18 @@ fn test_chunker1() { buffer.push(byte); } } - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let mut pos = 0; let mut last = 0; let mut chunks1: Vec<(usize, usize)> = vec![]; let mut chunks2: Vec<(usize, usize)> = vec![]; + let ctx = Context::default(); // test1: feed single bytes while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..pos + 1]); + let k = chunker.scan(&buffer[pos..pos + 1], &ctx); pos += 1; if k != 0 { let prev = last; @@ -235,13 +252,13 @@ fn test_chunker1() { } chunks1.push((last, buffer.len() - last)); - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let mut pos = 0; // test2: feed with whole buffer while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..]); + let k = chunker.scan(&buffer[pos..], &ctx); if k != 0 { chunks2.push((pos, k)); pos += k; diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs index b8047b5b..dc9eee05 100644 --- a/pbs-datastore/src/dynamic_index.rs +++ b/pbs-datastore/src/dynamic_index.rs @@ -23,7 +23,7 @@ use crate::data_blob::{DataBlob, DataChunkBuilder}; use crate::file_formats; use crate::index::{ChunkReadInfo, IndexFile}; use crate::read_chunk::ReadChunk; -use crate::Chunker; +use crate::{Chunker, ChunkerImpl}; /// Header format definition for dynamic index files (`.dixd`) #[repr(C)] @@ -397,7 +397,7 @@ impl DynamicIndexWriter { pub struct DynamicChunkWriter { index: DynamicIndexWriter, closed: bool, - chunker: Chunker, + chunker: ChunkerImpl, stat: ChunkStat, chunk_offset: usize, last_chunk: usize, @@ -409,7 +409,7 @@ impl DynamicChunkWriter { Self { index, closed: false, - chunker: Chunker::new(chunk_size), + chunker: ChunkerImpl::new(chunk_size), stat: ChunkStat::new(0), chunk_offset: 0, last_chunk: 0, @@ -494,7 +494,8 @@ impl Write for DynamicChunkWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { let chunker = &mut self.chunker; - let pos = chunker.scan(data); + let ctx = crate::chunker::Context::default(); + let pos = chunker.scan(data, &ctx); if pos > 0 { self.chunk_buffer.extend_from_slice(&data[0..pos]); diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index 43050162..24429626 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo}; pub use checksum_reader::ChecksumReader; pub use checksum_writer::ChecksumWriter; pub use chunk_store::ChunkStore; -pub use chunker::Chunker; +pub use chunker::{Chunker, ChunkerImpl}; pub use crypt_reader::CryptReader; pub use crypt_writer::CryptWriter; pub use data_blob::DataBlob;