diff --git a/proxmox-async/Cargo.toml b/proxmox-async/Cargo.toml index ae9301d3..309bf4ab 100644 --- a/proxmox-async/Cargo.toml +++ b/proxmox-async/Cargo.toml @@ -10,6 +10,8 @@ exclude = [ "debian" ] [dependencies] anyhow = "1.0" +bytes = "1.0" +flate2 = "1.0" futures = "0.3" lazy_static = "1.4" pin-utils = "0.1.0" diff --git a/proxmox-async/debian/changelog b/proxmox-async/debian/changelog index 74dcddb8..ee6a9686 100644 --- a/proxmox-async/debian/changelog +++ b/proxmox-async/debian/changelog @@ -1,5 +1,7 @@ rust-proxmox-async (0.1.0) stable; urgency=medium + * imported pbs-tools/src/compression.rs + * imported pbs-tools/src/tokio/tokio_writer_adapter.rs * imported pbs-tools/src/stream.rs diff --git a/proxmox-async/src/compression.rs b/proxmox-async/src/compression.rs new file mode 100644 index 00000000..aa1b0b24 --- /dev/null +++ b/proxmox-async/src/compression.rs @@ -0,0 +1,194 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Error; +use bytes::Bytes; +use flate2::{Compress, Compression, FlushCompress}; +use futures::ready; +use futures::stream::Stream; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use proxmox::io_format_err; +use proxmox_io::ByteBuffer; + +const BUFFER_SIZE: usize = 8192; + +pub enum Level { + Fastest, + Best, + Default, + Precise(u32), +} + +#[derive(Eq, PartialEq)] +enum EncoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct DeflateEncoder { + inner: T, + compressor: Compress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: EncoderState, +} + +impl DeflateEncoder { + pub fn new(inner: T) -> Self { + Self::with_quality(inner, Level::Default) + } + + pub fn with_quality(inner: T, level: Level) -> Self { + let level = match level { + Level::Fastest => Compression::fast(), + Level::Best => Compression::best(), + Level::Default => Compression::new(3), + Level::Precise(val) => Compression::new(val), + }; + + Self { + inner, + compressor: Compress::new(level, false), + buffer: ByteBuffer::with_capacity(BUFFER_SIZE), + input_buffer: Bytes::new(), + state: EncoderState::Reading, + } + } + + pub fn total_in(&self) -> u64 { + self.compressor.total_in() + } + + pub fn total_out(&self) -> u64 { + self.compressor.total_out() + } + + pub fn into_inner(self) -> T { + self.inner + } + + fn encode( + &mut self, + inbuf: &[u8], + flush: FlushCompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.compressor.total_in(); + let old_out = self.compressor.total_out(); + let res = self + .compressor + .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.compressor.total_in() - old_in) as usize; + let new_out = (self.compressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl DeflateEncoder> { + // assume small files + pub async fn compress_vec(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error> + where + R: AsyncRead + Unpin, + { + let mut buffer = Vec::with_capacity(size_hint); + reader.read_to_end(&mut buffer).await?; + self.inner.reserve(size_hint); // should be enough since we want smalller files + self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; + Ok(()) + } +} + +impl DeflateEncoder { + pub async fn compress(&mut self, reader: &mut R) -> Result<(), Error> + where + R: AsyncRead + Unpin, + { + let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE); + let mut eof = false; + loop { + if !eof && !buffer.is_full() { + let read = buffer.read_from_async(reader).await?; + if read == 0 { + eof = true; + } + } + let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?; + buffer.consume(read); + + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + + if buffer.is_empty() && eof { + break; + } + } + + loop { + let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?; + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + if res == flate2::Status::StreamEnd { + break; + } + } + + Ok(()) + } +} + +impl Stream for DeflateEncoder +where + T: Stream> + Unpin, + O: Into +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + EncoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res?; + this.input_buffer = buf.into(); + this.state = EncoderState::Writing; + } else { + this.state = EncoderState::Flushing; + } + } + EncoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.encode(&buf[..], FlushCompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = EncoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + EncoderState::Flushing => { + let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = EncoderState::Finished; + } + } + EncoderState::Finished => return Poll::Ready(None), + } + } + } +} diff --git a/proxmox-async/src/lib.rs b/proxmox-async/src/lib.rs index 7eeffa60..6097cfaf 100644 --- a/proxmox-async/src/lib.rs +++ b/proxmox-async/src/lib.rs @@ -1,5 +1,6 @@ pub mod blocking; pub mod broadcast_future; +pub mod compression; pub mod runtime; pub mod stream; pub mod tokio_writer_adapter;