From 4a3adc3de80407491c523a5dd9cf76e1ef3fcdef Mon Sep 17 00:00:00 2001 From: Dominik Csapak Date: Thu, 18 Jun 2020 13:55:27 +0200 Subject: [PATCH] add AsyncIndexReader implements AsyncRead as well as Stream for an IndexFile and a store that implements AsyncReadChunk we can use this to asyncread or stream the content of a FixedIndex or DynamicIndex Signed-off-by: Dominik Csapak --- src/backup.rs | 3 + src/backup/async_index_reader.rs | 186 +++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 src/backup/async_index_reader.rs diff --git a/src/backup.rs b/src/backup.rs index 7a2bf1ca..3a89bcb2 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -200,3 +200,6 @@ pub use datastore::*; mod catalog_shell; pub use catalog_shell::*; + +mod async_index_reader; +pub use async_index_reader::*; diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs new file mode 100644 index 00000000..43a4686e --- /dev/null +++ b/src/backup/async_index_reader.rs @@ -0,0 +1,186 @@ +use std::future::Future; +use std::task::{Poll, Context}; +use std::pin::Pin; + +use anyhow::Error; +use futures::future::FutureExt; +use futures::ready; +use tokio::{io::AsyncRead, stream::Stream}; + +use proxmox::sys::error::io_err_other; +use proxmox::io_format_err; + +use super::IndexFile; +use super::read_chunk::AsyncReadChunk; + +enum AsyncIndexReaderState { + NoData, + WaitForData(Pin), Error>> + Send + 'static>>), + HaveData(usize), +} + +pub struct AsyncIndexReader { + store: Option, + index: I, + read_buffer: Vec, + current_chunk_idx: usize, + current_chunk_digest: [u8; 32], + state: AsyncIndexReaderState, +} + +// ok because the only public interfaces operates on &mut Self +unsafe impl Sync for AsyncIndexReader {} + +impl AsyncIndexReader { + pub fn new(index: I, store: S) -> Self { + Self { + store: Some(store), + index, + read_buffer: Vec::with_capacity(1024*1024), + current_chunk_idx: 0, + current_chunk_digest: [0u8; 32], + state: AsyncIndexReaderState::NoData, + } + } +} + +impl AsyncRead for AsyncIndexReader where +S: AsyncReadChunk + Unpin + 'static, +I: IndexFile + Unpin +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let this = Pin::get_mut(self); + loop { + match &mut this.state { + AsyncIndexReaderState::NoData => { + if this.current_chunk_idx >= this.index.index_count() { + return Poll::Ready(Ok(0)); + } + + let digest = this + .index + .index_digest(this.current_chunk_idx) + .ok_or(io_format_err!("could not get digest"))? + .clone(); + + if digest == this.current_chunk_digest { + this.state = AsyncIndexReaderState::HaveData(0); + continue; + } + + this.current_chunk_digest = digest; + + let mut store = match this.store.take() { + Some(store) => store, + None => { + return Poll::Ready(Err(io_format_err!("could not find store"))); + }, + }; + + let future = async move { + store.read_chunk(&digest) + .await + .map(move |x| (store, x)) + }; + + this.state = AsyncIndexReaderState::WaitForData(future.boxed()); + }, + AsyncIndexReaderState::WaitForData(ref mut future) => { + match ready!(future.as_mut().poll(cx)) { + Ok((store, mut chunk_data)) => { + this.read_buffer.clear(); + this.read_buffer.append(&mut chunk_data); + this.state = AsyncIndexReaderState::HaveData(0); + this.store = Some(store); + }, + Err(err) => { + return Poll::Ready(Err(io_err_other(err))); + }, + }; + }, + AsyncIndexReaderState::HaveData(offset) => { + let offset = *offset; + let len = this.read_buffer.len(); + let n = if len - offset < buf.len() { + len - offset + } else { + buf.len() + }; + + buf[0..n].copy_from_slice(&this.read_buffer[offset..offset+n]); + if offset + n == len { + this.state = AsyncIndexReaderState::NoData; + this.current_chunk_idx += 1; + } else { + this.state = AsyncIndexReaderState::HaveData(offset + n); + } + + return Poll::Ready(Ok(n)); + }, + } + } + } +} + +impl Stream for AsyncIndexReader +where + S: AsyncReadChunk + Unpin + 'static, + I: IndexFile + Unpin +{ + type Item = Result, std::io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = Pin::get_mut(self); + loop { + match &mut this.state { + AsyncIndexReaderState::NoData => { + if this.current_chunk_idx >= this.index.index_count() { + return Poll::Ready(None); + } + + let digest = this + .index + .index_digest(this.current_chunk_idx) + .ok_or(io_format_err!("could not get digest"))? + .clone(); + + let mut store = match this.store.take() { + Some(store) => store, + None => { + return Poll::Ready(Some(Err(io_format_err!("could not find store")))); + }, + }; + + let future = async move { + store.read_chunk(&digest) + .await + .map(move |x| (store, x)) + }; + + this.state = AsyncIndexReaderState::WaitForData(future.boxed()); + }, + AsyncIndexReaderState::WaitForData(ref mut future) => { + match ready!(future.as_mut().poll(cx)) { + Ok((store, chunk_data)) => { + this.state = AsyncIndexReaderState::NoData; + this.store = Some(store); + this.current_chunk_idx += 1; + return Poll::Ready(Some(Ok(chunk_data.clone()))); + }, + Err(err) => { + return Poll::Ready(Some(Err(io_err_other(err)))); + }, + } + }, + _ => { + return Poll::Ready(Some(Err(io_format_err!("invalid state in stream")))); + }, + } + } + } +} +