From fc5870be53bc41c406834d39dcea6d202e17c98e Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 20 Jul 2021 11:26:29 +0200 Subject: [PATCH] move channel/stream helpers to pbs-tools pbs_tools ::blocking: std/async wrapping with block_in_place ::stream: stream <-> AsyncRead/AsyncWrite wrapping Signed-off-by: Wolfgang Bumiller --- pbs-tools/Cargo.toml | 1 + .../src/blocking.rs | 46 +--- pbs-tools/src/lib.rs | 4 +- pbs-tools/src/stream.rs | 229 ++++++++++++++++++ src/api2/admin/datastore.rs | 3 +- src/bin/proxmox-backup-client.rs | 3 +- src/server/rest.rs | 2 +- src/tools/async_channel_writer.rs | 106 -------- src/tools/mod.rs | 6 - 9 files changed, 239 insertions(+), 161 deletions(-) rename src/tools/wrapped_reader_stream.rs => pbs-tools/src/blocking.rs (64%) create mode 100644 pbs-tools/src/stream.rs delete mode 100644 src/tools/async_channel_writer.rs diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml index 0492338d..ef20a779 100644 --- a/pbs-tools/Cargo.toml +++ b/pbs-tools/Cargo.toml @@ -32,6 +32,7 @@ walkdir = "2" proxmox = { version = "0.11.5", default-features = false, features = [ "tokio" ] } pbs-buildcfg = { path = "../pbs-buildcfg" } +pbs-runtime = { path = "../pbs-runtime" } [dev-dependencies] tokio = { version = "1.6", features = [ "macros" ] } diff --git a/src/tools/wrapped_reader_stream.rs b/pbs-tools/src/blocking.rs similarity index 64% rename from src/tools/wrapped_reader_stream.rs rename to pbs-tools/src/blocking.rs index 6217545f..f5828dfb 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/pbs-tools/src/blocking.rs @@ -1,10 +1,10 @@ +//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers) + use std::io::{self, Read}; use std::pin::Pin; use std::task::{Context, Poll}; use std::sync::mpsc::Receiver; -use tokio::io::{AsyncRead, ReadBuf}; -use futures::ready; use futures::stream::Stream; use pbs_runtime::block_in_place; @@ -43,48 +43,6 @@ impl Stream for WrappedReaderStream { } } -/// Wrapper struct to convert an AsyncReader into a Stream -pub struct AsyncReaderStream { - reader: R, - buffer: Vec, -} - -impl AsyncReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } - - pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { - let mut buffer = Vec::with_capacity(buffer_size); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for AsyncReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - let mut read_buf = ReadBuf::new(&mut this.buffer); - match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { - Ok(()) => { - let n = read_buf.filled().len(); - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - /// Wrapper struct to convert a channel Receiver into a Stream pub struct StdChannelStream(pub Receiver); diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs index 28950787..c64615fd 100644 --- a/pbs-tools/src/lib.rs +++ b/pbs-tools/src/lib.rs @@ -1,5 +1,6 @@ pub mod acl; pub mod auth; +pub mod blocking; pub mod borrow; pub mod broadcast_future; pub mod cert; @@ -7,12 +8,14 @@ pub mod compression; pub mod format; pub mod fs; pub mod json; +pub mod lru_cache; pub mod nom; pub mod ops; pub mod percent_encoding; pub mod process_locker; pub mod sha; pub mod str; +pub mod stream; pub mod sync; pub mod ticket; pub mod tokio; @@ -20,7 +23,6 @@ pub mod xattr; pub mod zip; pub mod async_lru_cache; -pub mod lru_cache; mod command; pub use command::{command_output, command_output_as_string, run_command}; diff --git a/pbs-tools/src/stream.rs b/pbs-tools/src/stream.rs new file mode 100644 index 00000000..c00c1354 --- /dev/null +++ b/pbs-tools/src/stream.rs @@ -0,0 +1,229 @@ +//! Wrappers between async readers and streams. + +use std::io::{self, Read}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::{Error, Result}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::sync::mpsc::Sender; +use futures::ready; +use futures::future::FutureExt; +use futures::stream::Stream; + +use proxmox::io_format_err; +use proxmox::tools::byte_buffer::ByteBuffer; +use proxmox::sys::error::io_err_other; + +use pbs_runtime::block_in_place; + +/// Wrapper struct to convert a Reader into a Stream +pub struct WrappedReaderStream { + reader: R, + buffer: Vec, +} + +impl WrappedReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for WrappedReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match block_in_place(|| this.reader.read(&mut this.buffer)) { + Ok(n) => { + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} + +/// Wrapper struct to convert an AsyncReader into a Stream +pub struct AsyncReaderStream { + reader: R, + buffer: Vec, +} + +impl AsyncReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } + + pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { + let mut buffer = Vec::with_capacity(buffer_size); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for AsyncReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + let mut read_buf = ReadBuf::new(&mut this.buffer); + match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { + Ok(()) => { + let n = read_buf.filled().len(); + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} + +#[cfg(test)] +mod test { + use std::io; + + use anyhow::Error; + use futures::stream::TryStreamExt; + + #[test] + fn test_wrapped_stream_reader() -> Result<(), Error> { + pbs_runtime::main(async { + run_wrapped_stream_reader_test().await + }) + } + + struct DummyReader(usize); + + impl io::Read for DummyReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0 += 1; + + if self.0 >= 10 { + return Ok(0); + } + + unsafe { + std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); + } + + Ok(buf.len()) + } + } + + async fn run_wrapped_stream_reader_test() -> Result<(), Error> { + let mut reader = super::WrappedReaderStream::new(DummyReader(0)); + while let Some(_data) = reader.try_next().await? { + // just waiting + } + Ok(()) + } +} + +/// Wrapper around tokio::sync::mpsc::Sender, which implements Write +pub struct AsyncChannelWriter { + sender: Option, Error>>>, + buf: ByteBuffer, + state: WriterState, +} + +type SendResult = io::Result>>>; + +enum WriterState { + Ready, + Sending(Pin + Send + 'static>>), +} + +impl AsyncChannelWriter { + pub fn new(sender: Sender, Error>>, buf_size: usize) -> Self { + Self { + sender: Some(sender), + buf: ByteBuffer::with_capacity(buf_size), + state: WriterState::Ready, + } + } + + fn poll_write_impl( + &mut self, + cx: &mut Context, + buf: &[u8], + flush: bool, + ) -> Poll> { + loop { + match &mut self.state { + WriterState::Ready => { + if flush { + if self.buf.is_empty() { + return Poll::Ready(Ok(0)); + } + } else { + let free_size = self.buf.free_size(); + if free_size > buf.len() || self.buf.is_empty() { + let count = free_size.min(buf.len()); + self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]); + self.buf.add_size(count); + return Poll::Ready(Ok(count)); + } + } + + let sender = match self.sender.take() { + Some(sender) => sender, + None => return Poll::Ready(Err(io_err_other("no sender"))), + }; + + let data = self.buf.remove_data(self.buf.len()).to_vec(); + let future = async move { + sender + .send(Ok(data)) + .await + .map(move |_| sender) + .map_err(|err| io_format_err!("could not send: {}", err)) + }; + + self.state = WriterState::Sending(future.boxed()); + } + WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) { + Ok(sender) => { + self.sender = Some(sender); + self.state = WriterState::Ready; + } + Err(err) => return Poll::Ready(Err(err)), + }, + } + } + } +} + +impl AsyncWrite for AsyncChannelWriter { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + this.poll_write_impl(cx, buf, false) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match ready!(this.poll_write_impl(cx, &[], true)) { + Ok(_) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(err)), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.poll_flush(cx) + } +} diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index d1778fd9..f3c52413 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -27,6 +27,8 @@ use pxar::accessor::aio::Accessor; use pxar::EntryKind; use pbs_client::pxar::create_zip; +use pbs_tools::blocking::WrappedReaderStream; +use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter}; use pbs_tools::json::{required_integer_param, required_string_param}; use crate::api2::types::*; @@ -37,7 +39,6 @@ use crate::config::datastore; use crate::config::cached_user_info::CachedUserInfo; use crate::server::{jobstate::Job, WorkerTask}; -use crate::tools::{AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream}; use crate::config::acl::{ PRIV_DATASTORE_AUDIT, diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index ce9aee7b..6f898e55 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -79,7 +79,6 @@ use pbs_tools::json; use proxmox_backup::backup::{ BufferedDynamicReader, }; -use proxmox_backup::tools; mod proxmox_backup_client; use proxmox_backup_client::*; @@ -487,7 +486,7 @@ fn spawn_catalog_upload( encrypt: bool, ) -> Result { let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes - let catalog_stream = tools::StdChannelStream(catalog_rx); + let catalog_stream = pbs_tools::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512*1024; let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); diff --git a/src/server/rest.rs b/src/server/rest.rs index 16680484..62b63a5d 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -31,6 +31,7 @@ use proxmox::api::{ use proxmox::http_err; use pbs_tools::compression::{DeflateEncoder, Level}; +use pbs_tools::stream::AsyncReaderStream; use super::auth::AuthError; use super::environment::RestEnvironment; @@ -42,7 +43,6 @@ use crate::auth_helpers::*; use crate::config::cached_user_info::CachedUserInfo; use crate::tools; use crate::tools::compression::CompressionMethod; -use crate::tools::AsyncReaderStream; use crate::tools::FileLogger; extern "C" { diff --git a/src/tools/async_channel_writer.rs b/src/tools/async_channel_writer.rs deleted file mode 100644 index f48bd555..00000000 --- a/src/tools/async_channel_writer.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use anyhow::{Error, Result}; -use futures::{future::FutureExt, ready}; -use tokio::io::AsyncWrite; -use tokio::sync::mpsc::Sender; - -use proxmox::io_format_err; -use proxmox::tools::byte_buffer::ByteBuffer; -use proxmox::sys::error::io_err_other; - -/// Wrapper around tokio::sync::mpsc::Sender, which implements Write -pub struct AsyncChannelWriter { - sender: Option, Error>>>, - buf: ByteBuffer, - state: WriterState, -} - -type SendResult = io::Result>>>; - -enum WriterState { - Ready, - Sending(Pin + Send + 'static>>), -} - -impl AsyncChannelWriter { - pub fn new(sender: Sender, Error>>, buf_size: usize) -> Self { - Self { - sender: Some(sender), - buf: ByteBuffer::with_capacity(buf_size), - state: WriterState::Ready, - } - } - - fn poll_write_impl( - &mut self, - cx: &mut Context, - buf: &[u8], - flush: bool, - ) -> Poll> { - loop { - match &mut self.state { - WriterState::Ready => { - if flush { - if self.buf.is_empty() { - return Poll::Ready(Ok(0)); - } - } else { - let free_size = self.buf.free_size(); - if free_size > buf.len() || self.buf.is_empty() { - let count = free_size.min(buf.len()); - self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]); - self.buf.add_size(count); - return Poll::Ready(Ok(count)); - } - } - - let sender = match self.sender.take() { - Some(sender) => sender, - None => return Poll::Ready(Err(io_err_other("no sender"))), - }; - - let data = self.buf.remove_data(self.buf.len()).to_vec(); - let future = async move { - sender - .send(Ok(data)) - .await - .map(move |_| sender) - .map_err(|err| io_format_err!("could not send: {}", err)) - }; - - self.state = WriterState::Sending(future.boxed()); - } - WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) { - Ok(sender) => { - self.sender = Some(sender); - self.state = WriterState::Ready; - } - Err(err) => return Poll::Ready(Err(err)), - }, - } - } - } -} - -impl AsyncWrite for AsyncChannelWriter { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - let this = self.get_mut(); - this.poll_write_impl(cx, buf, false) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - match ready!(this.poll_write_impl(cx, &[], true)) { - Ok(_) => Poll::Ready(Ok(())), - Err(err) => Poll::Ready(Err(err)), - } - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.poll_flush(cx) - } -} diff --git a/src/tools/mod.rs b/src/tools/mod.rs index eba9a70c..2d2d923a 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -48,12 +48,6 @@ pub mod paperkey; pub mod parallel_handler; pub use parallel_handler::ParallelHandler; -mod wrapped_reader_stream; -pub use wrapped_reader_stream::{AsyncReaderStream, StdChannelStream, WrappedReaderStream}; - -mod async_channel_writer; -pub use async_channel_writer::AsyncChannelWriter; - mod file_logger; pub use file_logger::{FileLogger, FileLogOptions};