diff --git a/src/client/pull.rs b/src/client/pull.rs index cac6f15b..f671c003 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -23,26 +23,40 @@ use crate::{ async fn pull_index_chunks( _worker: &WorkerTask, - chunk_reader: &mut RemoteChunkReader, + chunk_reader: RemoteChunkReader, target: Arc, index: I, ) -> Result<(), Error> { + use futures::stream::{self, StreamExt, TryStreamExt}; - for pos in 0..index.index_count() { - let info = index.chunk_info(pos).unwrap(); - let chunk_exists = target.cond_touch_chunk(&info.digest, false)?; - if chunk_exists { - //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); - continue; - } - //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); - let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap())); - chunk.verify_unencrypted(info.size() as usize, &info.digest)?; + stream + .map(|info| { - target.insert_chunk(&chunk, &info.digest)?; - } + let target = Arc::clone(&target); + let chunk_reader = chunk_reader.clone(); + + Ok::<_, Error>(async move { + let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; + if chunk_exists { + //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); + return Ok::<_, Error>(()); + } + //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); + let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + + crate::tools::runtime::block_in_place(|| { + chunk.verify_unencrypted(info.size() as usize, &info.digest)?; + target.insert_chunk(&chunk, &info.digest)?; + Ok(()) + }) + }) + }) + .try_buffer_unordered(20) + .try_for_each(|_res| futures::future::ok(())) + .await?; Ok(()) } @@ -115,7 +129,7 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; + pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile) @@ -123,7 +137,7 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; + pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?; } ArchiveType::Blob => { let (csum, size) = compute_file_csum(&mut tmpfile)?;