From 78c29b33effe67707ec97c482e090b7046add911 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 3 Sep 2024 12:54:59 +0200 Subject: [PATCH] compression: add flush_window to DeflateEncoder Signed-off-by: Wolfgang Bumiller --- .../src/deflate/compression.rs | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/proxmox-compression/src/deflate/compression.rs b/proxmox-compression/src/deflate/compression.rs index 24d2bc99..bcb57244 100644 --- a/proxmox-compression/src/deflate/compression.rs +++ b/proxmox-compression/src/deflate/compression.rs @@ -33,6 +33,9 @@ pub struct DeflateEncoder { buffer: ByteBuffer, input_buffer: Bytes, state: EncoderState, + /// This is the current amount of sent data and the window size used for intermittent flushing + /// of the compression layer. + flush_window: Option<(usize, usize)>, } pub struct DeflateEncoderBuilder { @@ -40,24 +43,32 @@ pub struct DeflateEncoderBuilder { is_zlib: bool, buffer_size: usize, level: Level, + flush_window: Option, } impl DeflateEncoderBuilder { - pub fn zlib(mut self, is_zlib: bool) -> Self { + pub const fn zlib(mut self, is_zlib: bool) -> Self { self.is_zlib = is_zlib; self } - pub fn level(mut self, level: Level) -> Self { + pub const fn level(mut self, level: Level) -> Self { self.level = level; self } - pub fn buffer_size(mut self, buffer_size: usize) -> Self { + pub const fn buffer_size(mut self, buffer_size: usize) -> Self { self.buffer_size = buffer_size; self } + /// If set, this is the number of bytes after which the compressor will be notified to flush + /// some data, so the compression produces more steady output and a little earlier. + pub const fn flush_window(mut self, flush_window: Option) -> Self { + self.flush_window = flush_window; + self + } + pub fn build(self) -> DeflateEncoder { let level = match self.level { Level::Fastest => Compression::fast(), @@ -72,6 +83,7 @@ impl DeflateEncoderBuilder { buffer: ByteBuffer::with_capacity(self.buffer_size), input_buffer: Bytes::new(), state: EncoderState::Reading, + flush_window: self.flush_window.map(|n| (0, n)), } } } @@ -87,6 +99,7 @@ impl DeflateEncoder { is_zlib: false, buffer_size: super::BUFFER_SIZE, level: Level::Default, + flush_window: None, } } @@ -202,9 +215,25 @@ where if this.input_buffer.is_empty() { return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); } + + // in stream mode we'd like to flush regularly as the library's inner buffer is + // quite huge... + let flush_mode = match &mut this.flush_window { + Some((at, window)) if *at >= *window => { + *at = 0; + FlushCompress::Sync + } + _ => FlushCompress::None, + }; + let mut buf = this.input_buffer.split_off(0); - let (read, res) = this.encode(&buf[..], FlushCompress::None)?; + let (read, res) = this.encode(&buf[..], flush_mode)?; this.input_buffer = buf.split_off(read); + + if let Some((at, _window)) = &mut this.flush_window { + *at = at.saturating_add(read); + } + if this.input_buffer.is_empty() { this.state = EncoderState::Reading; }