From f386f512d0c6a6eb91322e2d4c19c83b55c43598 Mon Sep 17 00:00:00 2001 From: Dominik Csapak Date: Mon, 22 Jun 2020 16:44:13 +0200 Subject: [PATCH] add AsyncReaderStream and replace AsyncIndexReader's stream implementation with that Signed-off-by: Dominik Csapak --- src/api2/admin/datastore.rs | 6 +-- src/backup/async_index_reader.rs | 61 +----------------------------- src/tools/wrapped_reader_stream.rs | 42 +++++++++++++++++++- 3 files changed, 45 insertions(+), 64 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 1e8e120c..0120d34f 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -23,7 +23,7 @@ use crate::config::datastore; use crate::config::cached_user_info::CachedUserInfo; use crate::server::WorkerTask; -use crate::tools::{self, WrappedReaderStream}; +use crate::tools::{self, AsyncReaderStream, WrappedReaderStream}; use crate::config::acl::{ PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY, @@ -842,7 +842,7 @@ fn download_file_decoded( let chunk_reader = LocalChunkReader::new(datastore, None); let reader = AsyncIndexReader::new(index, chunk_reader); - Body::wrap_stream(reader + Body::wrap_stream(AsyncReaderStream::new(reader) .map_err(move |err| { eprintln!("error during streaming of '{:?}' - {}", path, err); err @@ -854,7 +854,7 @@ fn download_file_decoded( let chunk_reader = LocalChunkReader::new(datastore, None); let reader = AsyncIndexReader::new(index, chunk_reader); - Body::wrap_stream(reader + Body::wrap_stream(AsyncReaderStream::with_buffer_size(reader, 4*1024*1024) .map_err(move |err| { eprintln!("error during streaming of '{:?}' - {}", path, err); err diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs index 43a4686e..c954b1bf 100644 --- a/src/backup/async_index_reader.rs +++ b/src/backup/async_index_reader.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use anyhow::Error; use futures::future::FutureExt; use futures::ready; -use tokio::{io::AsyncRead, stream::Stream}; +use tokio::io::AsyncRead; use proxmox::sys::error::io_err_other; use proxmox::io_format_err; @@ -125,62 +125,3 @@ I: IndexFile + Unpin } } } - -impl Stream for AsyncIndexReader -where - S: AsyncReadChunk + Unpin + 'static, - I: IndexFile + Unpin -{ - type Item = Result, std::io::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = Pin::get_mut(self); - loop { - match &mut this.state { - AsyncIndexReaderState::NoData => { - if this.current_chunk_idx >= this.index.index_count() { - return Poll::Ready(None); - } - - let digest = this - .index - .index_digest(this.current_chunk_idx) - .ok_or(io_format_err!("could not get digest"))? - .clone(); - - let mut store = match this.store.take() { - Some(store) => store, - None => { - return Poll::Ready(Some(Err(io_format_err!("could not find store")))); - }, - }; - - let future = async move { - store.read_chunk(&digest) - .await - .map(move |x| (store, x)) - }; - - this.state = AsyncIndexReaderState::WaitForData(future.boxed()); - }, - AsyncIndexReaderState::WaitForData(ref mut future) => { - match ready!(future.as_mut().poll(cx)) { - Ok((store, chunk_data)) => { - this.state = AsyncIndexReaderState::NoData; - this.store = Some(store); - this.current_chunk_idx += 1; - return Poll::Ready(Some(Ok(chunk_data.clone()))); - }, - Err(err) => { - return Poll::Ready(Some(Err(io_err_other(err)))); - }, - } - }, - _ => { - return Poll::Ready(Some(Err(io_format_err!("invalid state in stream")))); - }, - } - } - } -} - diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index cb87e2b0..0294cc21 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -3,7 +3,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::sync::mpsc::Receiver; - +use tokio::io::AsyncRead; +use futures::ready; use futures::stream::Stream; use crate::tools::runtime::block_in_place; @@ -42,6 +43,45 @@ impl Stream for WrappedReaderStream { } } +/// Wrapper struct to convert an AsyncReader into a Stream +pub struct AsyncReaderStream { + reader: R, + buffer: Vec, +} + +impl AsyncReaderStream { + + pub fn new(reader: R) -> Self { + let mut buffer = Vec::with_capacity(64*1024); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } + + pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { + let mut buffer = Vec::with_capacity(buffer_size); + unsafe { buffer.set_len(buffer.capacity()); } + Self { reader, buffer } + } +} + +impl Stream for AsyncReaderStream { + type Item = Result, io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut this.buffer)) { + Ok(n) => { + if n == 0 { + // EOF + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) + } + } + Err(err) => Poll::Ready(Some(Err(err))), + } + } +} /// Wrapper struct to convert a channel Receiver into a Stream pub struct StdChannelStream(pub Receiver);