datastore: chunker: add Chunker trait

Add the Chunker trait and move the current Chunker to ChunkerImpl to
implement the trait instead. This allows to use different chunker
implementations by dynamic dispatch and is in preparation for
implementing a dedicated payload chunker.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-04-19 15:34:54 +02:00 committed by Fabian Grünbichler
parent a43399da06
commit e321815635
6 changed files with 91 additions and 68 deletions

View File

@ -5,10 +5,10 @@ extern crate proxmox_backup;
use anyhow::Error;
use std::io::{Read, Write};
use pbs_datastore::Chunker;
use pbs_datastore::{Chunker, ChunkerImpl};
struct ChunkWriter {
chunker: Chunker,
chunker: ChunkerImpl,
last_chunk: usize,
chunk_offset: usize,
@ -23,7 +23,7 @@ struct ChunkWriter {
impl ChunkWriter {
fn new(chunk_size: usize) -> Self {
ChunkWriter {
chunker: Chunker::new(chunk_size),
chunker: ChunkerImpl::new(chunk_size),
last_chunk: 0,
chunk_offset: 0,
chunk_count: 0,
@ -69,7 +69,8 @@ impl Write for ChunkWriter {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
let chunker = &mut self.chunker;
let pos = chunker.scan(data);
let ctx = pbs_datastore::chunker::Context::default();
let pos = chunker.scan(data, &ctx);
if pos > 0 {
self.chunk_offset += pos;

View File

@ -1,6 +1,6 @@
extern crate proxmox_backup;
use pbs_datastore::Chunker;
use pbs_datastore::{Chunker, ChunkerImpl};
fn main() {
let mut buffer = Vec::new();
@ -12,7 +12,7 @@ fn main() {
buffer.push(byte);
}
}
let mut chunker = Chunker::new(64 * 1024);
let mut chunker = ChunkerImpl::new(64 * 1024);
let count = 5;
@ -23,8 +23,9 @@ fn main() {
for _i in 0..count {
let mut pos = 0;
let mut _last = 0;
let ctx = pbs_datastore::chunker::Context::default();
while pos < buffer.len() {
let k = chunker.scan(&buffer[pos..]);
let k = chunker.scan(&buffer[pos..], &ctx);
if k == 0 {
//println!("LAST {}", pos);
break;

View File

@ -7,7 +7,7 @@ use bytes::BytesMut;
use futures::ready;
use futures::stream::{Stream, TryStream};
use pbs_datastore::Chunker;
use pbs_datastore::{Chunker, ChunkerImpl};
use crate::inject_reused_chunks::InjectChunks;
@ -16,7 +16,6 @@ pub struct InjectionData {
boundaries: mpsc::Receiver<InjectChunks>,
next_boundary: Option<InjectChunks>,
injections: mpsc::Sender<InjectChunks>,
consumed: u64,
}
impl InjectionData {
@ -28,7 +27,6 @@ impl InjectionData {
boundaries,
next_boundary: None,
injections,
consumed: 0,
}
}
}
@ -36,19 +34,22 @@ impl InjectionData {
/// Split input stream into dynamic sized chunks
pub struct ChunkStream<S: Unpin> {
input: S,
chunker: Chunker,
chunker: Box<dyn Chunker + Send>,
buffer: BytesMut,
scan_pos: usize,
consumed: u64,
injection_data: Option<InjectionData>,
}
impl<S: Unpin> ChunkStream<S> {
pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024);
Self {
input,
chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
chunker: Box::new(ChunkerImpl::new(chunk_size)),
buffer: BytesMut::new(),
scan_pos: 0,
consumed: 0,
injection_data,
}
}
@ -68,11 +69,15 @@ where
let this = self.get_mut();
loop {
let ctx = pbs_datastore::chunker::Context {
base: this.consumed,
total: this.buffer.len() as u64,
};
if let Some(InjectionData {
boundaries,
next_boundary,
injections,
consumed,
}) = this.injection_data.as_mut()
{
if next_boundary.is_none() {
@ -84,29 +89,29 @@ where
if let Some(inject) = next_boundary.take() {
// require forced boundary, lookup next regular boundary
let pos = if this.scan_pos < this.buffer.len() {
this.chunker.scan(&this.buffer[this.scan_pos..])
this.chunker.scan(&this.buffer[this.scan_pos..], &ctx)
} else {
0
};
let chunk_boundary = if pos == 0 {
*consumed + this.buffer.len() as u64
this.consumed + this.buffer.len() as u64
} else {
*consumed + (this.scan_pos + pos) as u64
this.consumed + (this.scan_pos + pos) as u64
};
if inject.boundary <= chunk_boundary {
// forced boundary is before next boundary, force within current buffer
let chunk_size = (inject.boundary - *consumed) as usize;
let chunk_size = (inject.boundary - this.consumed) as usize;
let raw_chunk = this.buffer.split_to(chunk_size);
this.chunker.reset();
this.scan_pos = 0;
*consumed += chunk_size as u64;
this.consumed += chunk_size as u64;
// add the size of the injected chunks to consumed, so chunk stream offsets
// are in sync with the rest of the archive.
*consumed += inject.size as u64;
this.consumed += inject.size as u64;
injections.send(inject).unwrap();
@ -118,7 +123,7 @@ where
// forced boundary is after next boundary, split off chunk from buffer
let chunk_size = this.scan_pos + pos;
let raw_chunk = this.buffer.split_to(chunk_size);
*consumed += chunk_size as u64;
this.consumed += chunk_size as u64;
this.scan_pos = 0;
return Poll::Ready(Some(Ok(raw_chunk)));
@ -131,7 +136,7 @@ where
}
if this.scan_pos < this.buffer.len() {
let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
let boundary = this.chunker.scan(&this.buffer[this.scan_pos..], &ctx);
let chunk_size = this.scan_pos + boundary;
@ -140,9 +145,7 @@ where
} else if chunk_size <= this.buffer.len() {
// found new chunk boundary inside buffer, split off chunk from buffer
let raw_chunk = this.buffer.split_to(chunk_size);
if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
*consumed += chunk_size as u64;
}
this.consumed += chunk_size as u64;
this.scan_pos = 0;
return Poll::Ready(Some(Ok(raw_chunk)));
} else {

View File

@ -5,6 +5,20 @@
/// use hash value 0 to detect a boundary.
const CA_CHUNKER_WINDOW_SIZE: usize = 64;
/// Additional context for chunker to find possible boundaries in payload streams
#[derive(Default)]
pub struct Context {
/// Already consumed bytes of the chunk stream consumer
pub base: u64,
/// Total size currently buffered
pub total: u64,
}
pub trait Chunker {
fn scan(&mut self, data: &[u8], ctx: &Context) -> usize;
fn reset(&mut self);
}
/// Sliding window chunker (Buzhash)
///
/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
@ -15,7 +29,7 @@ const CA_CHUNKER_WINDOW_SIZE: usize = 64;
/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from
/// Wikipedia.
pub struct Chunker {
pub struct ChunkerImpl {
h: u32,
window_size: usize,
chunk_size: usize,
@ -67,7 +81,7 @@ const BUZHASH_TABLE: [u32; 256] = [
0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
];
impl Chunker {
impl ChunkerImpl {
/// Create a new Chunker instance, which produces and average
/// chunk size of `chunk_size_avg` (need to be a power of two). We
/// allow variation from `chunk_size_avg/4` up to a maximum of
@ -105,11 +119,44 @@ impl Chunker {
}
}
// fast implementation avoiding modulo
// #[inline(always)]
fn shall_break(&self) -> bool {
if self.chunk_size >= self.chunk_size_max {
return true;
}
if self.chunk_size < self.chunk_size_min {
return false;
}
//(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
//(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
(self.h & self.break_test_mask) >= self.break_test_minimum
}
// This is the original implementation from casync
/*
#[inline(always)]
fn shall_break_orig(&self) -> bool {
if self.chunk_size >= self.chunk_size_max { return true; }
if self.chunk_size < self.chunk_size_min { return false; }
(self.h % self.discriminator) == (self.discriminator - 1)
}
*/
}
impl Chunker for ChunkerImpl {
/// Scans the specified data for a chunk border. Returns 0 if none
/// was found (and the function should be called with more data
/// later on), or another value indicating the position of a
/// border.
pub fn scan(&mut self, data: &[u8]) -> usize {
fn scan(&mut self, data: &[u8], _ctx: &Context) -> usize {
let window_len = self.window.len();
let data_len = data.len();
@ -167,42 +214,11 @@ impl Chunker {
0
}
pub fn reset(&mut self) {
fn reset(&mut self) {
self.h = 0;
self.chunk_size = 0;
self.window_size = 0;
}
// fast implementation avoiding modulo
// #[inline(always)]
fn shall_break(&self) -> bool {
if self.chunk_size >= self.chunk_size_max {
return true;
}
if self.chunk_size < self.chunk_size_min {
return false;
}
//(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
//(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
(self.h & self.break_test_mask) >= self.break_test_minimum
}
// This is the original implementation from casync
/*
#[inline(always)]
fn shall_break_orig(&self) -> bool {
if self.chunk_size >= self.chunk_size_max { return true; }
if self.chunk_size < self.chunk_size_min { return false; }
(self.h % self.discriminator) == (self.discriminator - 1)
}
*/
}
#[test]
@ -215,17 +231,18 @@ fn test_chunker1() {
buffer.push(byte);
}
}
let mut chunker = Chunker::new(64 * 1024);
let mut chunker = ChunkerImpl::new(64 * 1024);
let mut pos = 0;
let mut last = 0;
let mut chunks1: Vec<(usize, usize)> = vec![];
let mut chunks2: Vec<(usize, usize)> = vec![];
let ctx = Context::default();
// test1: feed single bytes
while pos < buffer.len() {
let k = chunker.scan(&buffer[pos..pos + 1]);
let k = chunker.scan(&buffer[pos..pos + 1], &ctx);
pos += 1;
if k != 0 {
let prev = last;
@ -235,13 +252,13 @@ fn test_chunker1() {
}
chunks1.push((last, buffer.len() - last));
let mut chunker = Chunker::new(64 * 1024);
let mut chunker = ChunkerImpl::new(64 * 1024);
let mut pos = 0;
// test2: feed with whole buffer
while pos < buffer.len() {
let k = chunker.scan(&buffer[pos..]);
let k = chunker.scan(&buffer[pos..], &ctx);
if k != 0 {
chunks2.push((pos, k));
pos += k;

View File

@ -23,7 +23,7 @@ use crate::data_blob::{DataBlob, DataChunkBuilder};
use crate::file_formats;
use crate::index::{ChunkReadInfo, IndexFile};
use crate::read_chunk::ReadChunk;
use crate::Chunker;
use crate::{Chunker, ChunkerImpl};
/// Header format definition for dynamic index files (`.dixd`)
#[repr(C)]
@ -397,7 +397,7 @@ impl DynamicIndexWriter {
pub struct DynamicChunkWriter {
index: DynamicIndexWriter,
closed: bool,
chunker: Chunker,
chunker: ChunkerImpl,
stat: ChunkStat,
chunk_offset: usize,
last_chunk: usize,
@ -409,7 +409,7 @@ impl DynamicChunkWriter {
Self {
index,
closed: false,
chunker: Chunker::new(chunk_size),
chunker: ChunkerImpl::new(chunk_size),
stat: ChunkStat::new(0),
chunk_offset: 0,
last_chunk: 0,
@ -494,7 +494,8 @@ impl Write for DynamicChunkWriter {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
let chunker = &mut self.chunker;
let pos = chunker.scan(data);
let ctx = crate::chunker::Context::default();
let pos = chunker.scan(data, &ctx);
if pos > 0 {
self.chunk_buffer.extend_from_slice(&data[0..pos]);

View File

@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo};
pub use checksum_reader::ChecksumReader;
pub use checksum_writer::ChecksumWriter;
pub use chunk_store::ChunkStore;
pub use chunker::Chunker;
pub use chunker::{Chunker, ChunkerImpl};
pub use crypt_reader::CryptReader;
pub use crypt_writer::CryptWriter;
pub use data_blob::DataBlob;