diff --git a/proxmox-async/src/blocking/mod.rs b/proxmox-async/src/blocking/mod.rs new file mode 100644 index 00000000..f6fd7d8d --- /dev/null +++ b/proxmox-async/src/blocking/mod.rs @@ -0,0 +1,8 @@ +//! Async wrappers for blocking I/O (adding `block_in_place` around +//! channels/readers) + +mod std_channel_stream; +pub use std_channel_stream::StdChannelStream; + +mod wrapped_reader_stream; +pub use wrapped_reader_stream::WrappedReaderStream; diff --git a/proxmox-async/src/blocking/std_channel_stream.rs b/proxmox-async/src/blocking/std_channel_stream.rs new file mode 100644 index 00000000..48785dbf --- /dev/null +++ b/proxmox-async/src/blocking/std_channel_stream.rs @@ -0,0 +1,21 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::sync::mpsc::Receiver; + +use futures::stream::Stream; + +use crate::runtime::block_in_place; + +/// Wrapper struct to convert a channel Receiver into a Stream +pub struct StdChannelStream(pub Receiver); + +impl Stream for StdChannelStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + match block_in_place(|| self.0.recv()) { + Ok(data) => Poll::Ready(Some(data)), + Err(_) => Poll::Ready(None),// channel closed + } + } +} diff --git a/proxmox-async/src/blocking.rs b/proxmox-async/src/blocking/wrapped_reader_stream.rs similarity index 78% rename from proxmox-async/src/blocking.rs rename to proxmox-async/src/blocking/wrapped_reader_stream.rs index 36d3f1e0..d9603a23 100644 --- a/proxmox-async/src/blocking.rs +++ b/proxmox-async/src/blocking/wrapped_reader_stream.rs @@ -1,9 +1,6 @@ -//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers) - use std::io::{self, Read}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::sync::mpsc::Receiver; use futures::stream::Stream; @@ -43,20 +40,6 @@ impl Stream for WrappedReaderStream { } } -/// Wrapper struct to convert a channel Receiver into a Stream -pub struct StdChannelStream(pub Receiver); - -impl Stream for StdChannelStream { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - match block_in_place(|| self.0.recv()) { - Ok(data) => Poll::Ready(Some(data)), - Err(_) => Poll::Ready(None),// channel closed - } - } -} - #[cfg(test)] mod test { use std::io;