diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 9b327324..1793e432 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -514,7 +514,6 @@ impl BackupClient { hyper::rt::spawn( verify_queue_rx .map_err(Error::from) - //.for_each(|response: h2::client::ResponseFuture| { .and_then(move |merged_chunk_info| { match merged_chunk_info { MergedChunkInfo::New(chunk_info) => { @@ -544,6 +543,7 @@ impl BackupClient { } } }) + .merge_known_chunks() .and_then(move |merged_chunk_info| { match merged_chunk_info { MergedChunkInfo::Known(chunk_list) => { @@ -642,9 +642,17 @@ impl BackupClient { .map(move |chunk_info| { repeat.fetch_add(1, Ordering::SeqCst); stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); - chunk_info + + let mut known_chunks = known_chunks.lock().unwrap(); + let chunk_is_known = known_chunks.contains(&chunk_info.digest); + if chunk_is_known { + MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]) + } else { + known_chunks.insert(chunk_info.digest); + MergedChunkInfo::New(chunk_info) + } }) - .merge_known_chunks(known_chunks.clone()) + .merge_known_chunks() .for_each(move |merged_chunk_info| { upload_queue.clone().send(merged_chunk_info) .map(|_| ()).map_err(Error::from) diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs index 43c0ce02..28ea5bf8 100644 --- a/src/client/merge_known_chunks.rs +++ b/src/client/merge_known_chunks.rs @@ -1,7 +1,5 @@ use failure::*; use futures::*; -use std::collections::HashSet; -use std::sync::{Arc, Mutex}; pub struct ChunkInfo { pub digest: [u8; 32], @@ -15,25 +13,24 @@ pub enum MergedChunkInfo { } pub trait MergeKnownChunks: Sized { - fn merge_known_chunks(self, known_chunks: Arc>>) -> MergeKnownChunksQueue; + fn merge_known_chunks(self) -> MergeKnownChunksQueue; } pub struct MergeKnownChunksQueue { input: S, - known_chunks: Arc>>, buffer: Option, } impl MergeKnownChunks for S - where S: Stream, + where S: Stream, { - fn merge_known_chunks(self, known_chunks: Arc>>) -> MergeKnownChunksQueue { - MergeKnownChunksQueue { input: self, known_chunks, buffer: None } + fn merge_known_chunks(self) -> MergeKnownChunksQueue { + MergeKnownChunksQueue { input: self, buffer: None } } } impl Stream for MergeKnownChunksQueue - where S: Stream, + where S: Stream, { type Item = MergedChunkInfo; type Error = Error; @@ -54,45 +51,42 @@ impl Stream for MergeKnownChunksQueue return Ok(Async::Ready(None)); } } - Ok(Async::Ready(Some(chunk_info))) => { + Ok(Async::Ready(Some(mergerd_chunk_info))) => { - let mut known_chunks = self.known_chunks.lock().unwrap(); - let chunk_is_known = known_chunks.contains(&chunk_info.digest); + match mergerd_chunk_info { + MergedChunkInfo::Known(list) => { - if chunk_is_known { + let last = self.buffer.take(); - let last = self.buffer.take(); - - match last { - None => { - self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])); - // continue - } - Some(MergedChunkInfo::Known(mut list)) => { - list.push((chunk_info.offset, chunk_info.digest)); - let len = list.len(); - self.buffer = Some(MergedChunkInfo::Known(list)); - - if len >= 64 { - return Ok(Async::Ready(self.buffer.take())); + match last { + None => { + self.buffer = Some(MergedChunkInfo::Known(list)); + // continue } - // continue + Some(MergedChunkInfo::Known(mut last_list)) => { + last_list.extend_from_slice(&list); + let len = last_list.len(); + self.buffer = Some(MergedChunkInfo::Known(last_list)); - } - Some(MergedChunkInfo::New(_)) => { - self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])); - return Ok(Async::Ready(last)); + if len >= 64 { + return Ok(Async::Ready(self.buffer.take())); + } + // continue + } + Some(MergedChunkInfo::New(_)) => { + self.buffer = Some(MergedChunkInfo::Known(list)); + return Ok(Async::Ready(last)); + } } } - - } else { - known_chunks.insert(chunk_info.digest); - let new = MergedChunkInfo::New(chunk_info); - if let Some(last) = self.buffer.take() { - self.buffer = Some(new); - return Ok(Async::Ready(Some(last))); - } else { - return Ok(Async::Ready(Some(new))); + MergedChunkInfo::New(chunk_info) => { + let new = MergedChunkInfo::New(chunk_info); + if let Some(last) = self.buffer.take() { + self.buffer = Some(new); + return Ok(Async::Ready(Some(last))); + } else { + return Ok(Async::Ready(Some(new))); + } } } }