diff --git a/proxmox-async/src/stream.rs b/proxmox-async/src/stream/async_channel_writer.rs similarity index 51% rename from proxmox-async/src/stream.rs rename to proxmox-async/src/stream/async_channel_writer.rs index 88d42c7f..e8789672 100644 --- a/proxmox-async/src/stream.rs +++ b/proxmox-async/src/stream/async_channel_writer.rs @@ -1,140 +1,20 @@ //! Wrappers between async readers and streams. -use std::io::{self, Read}; +use std::io; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::{Error, Result}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::io::{AsyncWrite}; use tokio::sync::mpsc::Sender; use futures::ready; use futures::future::FutureExt; -use futures::stream::Stream; use proxmox::io_format_err; use proxmox::sys::error::io_err_other; use proxmox_io::ByteBuffer; -use crate::runtime::block_in_place; - -/// Wrapper struct to convert a Reader into a Stream -pub struct WrappedReaderStream { - reader: R, - buffer: Vec, -} - -impl WrappedReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for WrappedReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - let this = self.get_mut(); - match block_in_place(|| this.reader.read(&mut this.buffer)) { - Ok(n) => { - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - -/// Wrapper struct to convert an AsyncReader into a Stream -pub struct AsyncReaderStream { - reader: R, - buffer: Vec, -} - -impl AsyncReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } - - pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { - let mut buffer = Vec::with_capacity(buffer_size); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for AsyncReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - let mut read_buf = ReadBuf::new(&mut this.buffer); - match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { - Ok(()) => { - let n = read_buf.filled().len(); - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - -#[cfg(test)] -mod test { - use std::io; - - use anyhow::Error; - use futures::stream::TryStreamExt; - - #[test] - fn test_wrapped_stream_reader() -> Result<(), Error> { - crate::runtime::main(async { - run_wrapped_stream_reader_test().await - }) - } - - struct DummyReader(usize); - - impl io::Read for DummyReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0 += 1; - - if self.0 >= 10 { - return Ok(0); - } - - unsafe { - std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); - } - - Ok(buf.len()) - } - } - - async fn run_wrapped_stream_reader_test() -> Result<(), Error> { - let mut reader = super::WrappedReaderStream::new(DummyReader(0)); - while let Some(_data) = reader.try_next().await? { - // just waiting - } - Ok(()) - } -} - /// Wrapper around tokio::sync::mpsc::Sender, which implements Write pub struct AsyncChannelWriter { sender: Option, Error>>>, diff --git a/proxmox-async/src/stream/async_reader_stream.rs b/proxmox-async/src/stream/async_reader_stream.rs new file mode 100644 index 00000000..6f1b5a15 --- /dev/null +++ b/proxmox-async/src/stream/async_reader_stream.rs @@ -0,0 +1,51 @@ +//! Wrappers between async readers and streams. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::io::{AsyncRead, ReadBuf}; +use futures::ready; +use futures::stream::Stream; + +/// Wrapper struct to convert an AsyncReader into a Stream +pub struct AsyncReaderStream { + reader: R, + buffer: Vec, +} + +impl AsyncReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } + + pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { + let mut buffer = Vec::with_capacity(buffer_size); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for AsyncReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + let mut read_buf = ReadBuf::new(&mut this.buffer); + match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { + Ok(()) => { + let n = read_buf.filled().len(); + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} diff --git a/proxmox-async/src/stream/mod.rs b/proxmox-async/src/stream/mod.rs new file mode 100644 index 00000000..ad22e016 --- /dev/null +++ b/proxmox-async/src/stream/mod.rs @@ -0,0 +1,10 @@ +//! Wrappers between async readers and streams. + +mod async_channel_writer; +pub use async_channel_writer::AsyncChannelWriter; + +mod async_reader_stream; +pub use async_reader_stream::AsyncReaderStream; + +mod wrapped_reader_stream; +pub use wrapped_reader_stream::WrappedReaderStream; diff --git a/proxmox-async/src/stream/wrapped_reader_stream.rs b/proxmox-async/src/stream/wrapped_reader_stream.rs new file mode 100644 index 00000000..d9603a23 --- /dev/null +++ b/proxmox-async/src/stream/wrapped_reader_stream.rs @@ -0,0 +1,82 @@ +use std::io::{self, Read}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::stream::Stream; + +use crate::runtime::block_in_place; + +/// Wrapper struct to convert a Reader into a Stream +pub struct WrappedReaderStream { + reader: R, + buffer: Vec, +} + +impl WrappedReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for WrappedReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match block_in_place(|| this.reader.read(&mut this.buffer)) { + Ok(n) => { + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} + +#[cfg(test)] +mod test { + use std::io; + + use anyhow::Error; + use futures::stream::TryStreamExt; + + #[test] + fn test_wrapped_stream_reader() -> Result<(), Error> { + crate::runtime::main(async { + run_wrapped_stream_reader_test().await + }) + } + + struct DummyReader(usize); + + impl io::Read for DummyReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0 += 1; + + if self.0 >= 10 { + return Ok(0); + } + + unsafe { + std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); + } + + Ok(buf.len()) + } + } + + async fn run_wrapped_stream_reader_test() -> Result<(), Error> { + let mut reader = super::WrappedReaderStream::new(DummyReader(0)); + while let Some(_data) = reader.try_next().await? { + // just waiting + } + Ok(()) + } +}