From 743f7df2a5e41cab8f404b655d7049bb225349e1 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 19 Nov 2021 16:12:31 +0100 Subject: [PATCH] proxmox-async: imported pbs-tools/src/stream.rs Signed-off-by: Dietmar Maurer --- proxmox-async/Cargo.toml | 3 + proxmox-async/debian/changelog | 2 + proxmox-async/src/lib.rs | 1 + proxmox-async/src/stream.rs | 229 +++++++++++++++++++++++++++++++++ 4 files changed, 235 insertions(+) create mode 100644 proxmox-async/src/stream.rs diff --git a/proxmox-async/Cargo.toml b/proxmox-async/Cargo.toml index ba5a2fff..ae9301d3 100644 --- a/proxmox-async/Cargo.toml +++ b/proxmox-async/Cargo.toml @@ -14,3 +14,6 @@ futures = "0.3" lazy_static = "1.4" pin-utils = "0.1.0" tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "sync"] } + +proxmox = { version = "0.15.3", default-features = false, features = [ "tokio" ] } +proxmox-io = { version = "1", features = [ "tokio" ] } diff --git a/proxmox-async/debian/changelog b/proxmox-async/debian/changelog index 16635a40..d3560e3d 100644 --- a/proxmox-async/debian/changelog +++ b/proxmox-async/debian/changelog @@ -1,5 +1,7 @@ rust-proxmox-async (0.1.0) stable; urgency=medium + * imported pbs-tools/src/stream.rs + * imported pbs-tools/src/broadcast_future.rs * imported pbs-tools/src/blocking.rs diff --git a/proxmox-async/src/lib.rs b/proxmox-async/src/lib.rs index 7f7dd83a..2e95bcca 100644 --- a/proxmox-async/src/lib.rs +++ b/proxmox-async/src/lib.rs @@ -1,3 +1,4 @@ pub mod blocking; pub mod broadcast_future; pub mod runtime; +pub mod stream; diff --git a/proxmox-async/src/stream.rs b/proxmox-async/src/stream.rs new file mode 100644 index 00000000..88d42c7f --- /dev/null +++ b/proxmox-async/src/stream.rs @@ -0,0 +1,229 @@ +//! Wrappers between async readers and streams. + +use std::io::{self, Read}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::{Error, Result}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::sync::mpsc::Sender; +use futures::ready; +use futures::future::FutureExt; +use futures::stream::Stream; + +use proxmox::io_format_err; +use proxmox::sys::error::io_err_other; +use proxmox_io::ByteBuffer; + +use crate::runtime::block_in_place; + +/// Wrapper struct to convert a Reader into a Stream +pub struct WrappedReaderStream { + reader: R, + buffer: Vec, +} + +impl WrappedReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for WrappedReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match block_in_place(|| this.reader.read(&mut this.buffer)) { + Ok(n) => { + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} + +/// Wrapper struct to convert an AsyncReader into a Stream +pub struct AsyncReaderStream { + reader: R, + buffer: Vec, +} + +impl AsyncReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } + + pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { + let mut buffer = Vec::with_capacity(buffer_size); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for AsyncReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + let mut read_buf = ReadBuf::new(&mut this.buffer); + match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { + Ok(()) => { + let n = read_buf.filled().len(); + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} + +#[cfg(test)] +mod test { + use std::io; + + use anyhow::Error; + use futures::stream::TryStreamExt; + + #[test] + fn test_wrapped_stream_reader() -> Result<(), Error> { + crate::runtime::main(async { + run_wrapped_stream_reader_test().await + }) + } + + struct DummyReader(usize); + + impl io::Read for DummyReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0 += 1; + + if self.0 >= 10 { + return Ok(0); + } + + unsafe { + std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); + } + + Ok(buf.len()) + } + } + + async fn run_wrapped_stream_reader_test() -> Result<(), Error> { + let mut reader = super::WrappedReaderStream::new(DummyReader(0)); + while let Some(_data) = reader.try_next().await? { + // just waiting + } + Ok(()) + } +} + +/// Wrapper around tokio::sync::mpsc::Sender, which implements Write +pub struct AsyncChannelWriter { + sender: Option, Error>>>, + buf: ByteBuffer, + state: WriterState, +} + +type SendResult = io::Result>>>; + +enum WriterState { + Ready, + Sending(Pin + Send + 'static>>), +} + +impl AsyncChannelWriter { + pub fn new(sender: Sender, Error>>, buf_size: usize) -> Self { + Self { + sender: Some(sender), + buf: ByteBuffer::with_capacity(buf_size), + state: WriterState::Ready, + } + } + + fn poll_write_impl( + &mut self, + cx: &mut Context, + buf: &[u8], + flush: bool, + ) -> Poll> { + loop { + match &mut self.state { + WriterState::Ready => { + if flush { + if self.buf.is_empty() { + return Poll::Ready(Ok(0)); + } + } else { + let free_size = self.buf.free_size(); + if free_size > buf.len() || self.buf.is_empty() { + let count = free_size.min(buf.len()); + self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]); + self.buf.add_size(count); + return Poll::Ready(Ok(count)); + } + } + + let sender = match self.sender.take() { + Some(sender) => sender, + None => return Poll::Ready(Err(io_err_other("no sender"))), + }; + + let data = self.buf.remove_data(self.buf.len()).to_vec(); + let future = async move { + sender + .send(Ok(data)) + .await + .map(move |_| sender) + .map_err(|err| io_format_err!("could not send: {}", err)) + }; + + self.state = WriterState::Sending(future.boxed()); + } + WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) { + Ok(sender) => { + self.sender = Some(sender); + self.state = WriterState::Ready; + } + Err(err) => return Poll::Ready(Err(err)), + }, + } + } + } +} + +impl AsyncWrite for AsyncChannelWriter { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + this.poll_write_impl(cx, buf, false) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match ready!(this.poll_write_impl(cx, &[], true)) { + Ok(_) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(err)), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.poll_flush(cx) + } +}