diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs index 0911375e..98372aa1 100644 --- a/src/backup/async_index_reader.rs +++ b/src/backup/async_index_reader.rs @@ -1,30 +1,35 @@ use std::future::Future; use std::task::{Poll, Context}; use std::pin::Pin; +use std::io::SeekFrom; use anyhow::Error; use futures::future::FutureExt; use futures::ready; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, AsyncSeek}; use proxmox::sys::error::io_err_other; use proxmox::io_format_err; use super::IndexFile; use super::read_chunk::AsyncReadChunk; +use super::index::ChunkReadInfo; enum AsyncIndexReaderState { NoData, WaitForData(Pin), Error>> + Send + 'static>>), - HaveData(usize), + HaveData, } pub struct AsyncIndexReader { store: Option, index: I, read_buffer: Vec, + current_chunk_offset: u64, current_chunk_idx: usize, - current_chunk_digest: [u8; 32], + current_chunk_info: Option, + position: u64, + seek_to_pos: i64, state: AsyncIndexReaderState, } @@ -37,8 +42,11 @@ impl AsyncIndexReader { store: Some(store), index, read_buffer: Vec::with_capacity(1024 * 1024), + current_chunk_offset: 0, current_chunk_idx: 0, - current_chunk_digest: [0u8; 32], + current_chunk_info: None, + position: 0, + seek_to_pos: 0, state: AsyncIndexReaderState::NoData, } } @@ -58,23 +66,41 @@ where loop { match &mut this.state { AsyncIndexReaderState::NoData => { - if this.current_chunk_idx >= this.index.index_count() { + let (idx, offset) = if this.current_chunk_info.is_some() && + this.position == this.current_chunk_info.as_ref().unwrap().range.end + { + // optimization for sequential chunk read + let next_idx = this.current_chunk_idx + 1; + (next_idx, 0) + } else { + match this.index.chunk_from_offset(this.position) { + Some(res) => res, + None => return Poll::Ready(Ok(0)) + } + }; + + if idx >= this.index.index_count() { return Poll::Ready(Ok(0)); } - let digest = this + let info = this .index - .index_digest(this.current_chunk_idx) - .ok_or(io_format_err!("could not get digest"))? - .clone(); + .chunk_info(idx) + .ok_or(io_format_err!("could not get digest"))?; - if digest == this.current_chunk_digest { - this.state = AsyncIndexReaderState::HaveData(0); - continue; + this.current_chunk_offset = offset; + this.current_chunk_idx = idx; + let old_info = this.current_chunk_info.replace(info.clone()); + + if let Some(old_info) = old_info { + if old_info.digest == info.digest { + // hit, chunk is currently in cache + this.state = AsyncIndexReaderState::HaveData; + continue; + } } - this.current_chunk_digest = digest; - + // miss, need to download new chunk let store = match this.store.take() { Some(store) => store, None => { @@ -83,7 +109,7 @@ where }; let future = async move { - store.read_chunk(&digest) + store.read_chunk(&info.digest) .await .map(move |x| (store, x)) }; @@ -95,7 +121,7 @@ where Ok((store, mut chunk_data)) => { this.read_buffer.clear(); this.read_buffer.append(&mut chunk_data); - this.state = AsyncIndexReaderState::HaveData(0); + this.state = AsyncIndexReaderState::HaveData; this.store = Some(store); } Err(err) => { @@ -103,8 +129,8 @@ where } }; } - AsyncIndexReaderState::HaveData(offset) => { - let offset = *offset; + AsyncIndexReaderState::HaveData => { + let offset = this.current_chunk_offset as usize; let len = this.read_buffer.len(); let n = if len - offset < buf.len() { len - offset @@ -113,11 +139,13 @@ where }; buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]); + this.position += n as u64; + if offset + n == len { this.state = AsyncIndexReaderState::NoData; - this.current_chunk_idx += 1; } else { - this.state = AsyncIndexReaderState::HaveData(offset + n); + this.current_chunk_offset += n as u64; + this.state = AsyncIndexReaderState::HaveData; } return Poll::Ready(Ok(n)); @@ -126,3 +154,51 @@ where } } } + +impl AsyncSeek for AsyncIndexReader +where + S: AsyncReadChunk + Unpin + Sync + 'static, + I: IndexFile + Unpin, +{ + fn start_seek( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let this = Pin::get_mut(self); + this.seek_to_pos = match pos { + SeekFrom::Start(offset) => { + offset as i64 + }, + SeekFrom::End(offset) => { + this.index.index_bytes() as i64 + offset + }, + SeekFrom::Current(offset) => { + this.position as i64 + offset + } + }; + Poll::Ready(Ok(())) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + let this = Pin::get_mut(self); + + let index_bytes = this.index.index_bytes(); + if this.seek_to_pos < 0 { + return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); + } else if this.seek_to_pos > index_bytes as i64 { + this.position = index_bytes; + } else { + this.position = this.seek_to_pos as u64; + } + + // even if seeking within one chunk, we need to go to NoData to + // recalculate the current_chunk_offset (data is cached anyway) + this.state = AsyncIndexReaderState::NoData; + + Poll::Ready(Ok(this.position)) + } +} diff --git a/src/backup/index.rs b/src/backup/index.rs index 2eab8524..c6bab56e 100644 --- a/src/backup/index.rs +++ b/src/backup/index.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::ops::Range; +#[derive(Clone)] pub struct ChunkReadInfo { pub range: Range, pub digest: [u8; 32],