diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index d63c09b5..4d2e3d08 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -21,6 +21,7 @@ use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1}; use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use proxmox_time::TimeSpan; use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; @@ -65,7 +66,12 @@ struct UploadStats { csum: [u8; 32], } -type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; +struct ChunkUploadResponse { + future: h2::client::ResponseFuture, + size: usize, +} + +type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; type UploadResultReceiver = oneshot::Receiver>; impl BackupWriter { @@ -332,6 +338,12 @@ impl BackupWriter { .as_u64() .unwrap(); + let archive = if log::log_enabled!(log::Level::Debug) { + archive_name + } else { + pbs_tools::format::strip_server_file_extension(archive_name) + }; + let upload_stats = Self::upload_chunk_info_stream( self.h2.clone(), wid, @@ -345,16 +357,12 @@ impl BackupWriter { }, options.compress, injections, + archive, ) .await?; let size_dirty = upload_stats.size - upload_stats.size_reused; let size: HumanByte = upload_stats.size.into(); - let archive = if log::log_enabled!(log::Level::Debug) { - archive_name - } else { - pbs_tools::format::strip_server_file_extension(archive_name) - }; if upload_stats.chunk_injected > 0 { log::info!( @@ -462,6 +470,7 @@ impl BackupWriter { h2: H2Client, wid: u64, path: String, + uploaded: Arc, ) -> (UploadQueueSender, UploadResultReceiver) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64); let (verify_result_tx, verify_result_rx) = oneshot::channel(); @@ -470,15 +479,21 @@ impl BackupWriter { tokio::spawn( ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) - .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { + .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { (Some(response), MergedChunkInfo::Known(list)) => { Either::Left( response + .future .map_err(Error::from) .and_then(H2Client::h2api_response) - .and_then(move |_result| { - future::ok(MergedChunkInfo::Known(list)) + .and_then({ + let uploaded = uploaded.clone(); + move |_result| { + // account for uploaded bytes for progress output + uploaded.fetch_add(response.size, Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(list)) + } }) ) } @@ -636,6 +651,7 @@ impl BackupWriter { crypt_config: Option>, compress: bool, injections: Option>, + archive: &str, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -646,25 +662,51 @@ impl BackupWriter { let stream_len = Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); + let stream_len3 = stream_len.clone(); let compressed_stream_len = Arc::new(AtomicU64::new(0)); let compressed_stream_len2 = compressed_stream_len.clone(); let reused_len = Arc::new(AtomicUsize::new(0)); let reused_len2 = reused_len.clone(); let injected_len = Arc::new(AtomicUsize::new(0)); let injected_len2 = injected_len.clone(); + let uploaded_len = Arc::new(AtomicUsize::new(0)); let append_chunk_path = format!("{}_index", prefix); let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone()); let start_time = std::time::Instant::now(); let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); + let progress_handle = if archive.ends_with(".img") + || archive.ends_with(".pxar") + || archive.ends_with(".ppxar") + { + Some(tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + let size = stream_len3.load(Ordering::SeqCst); + let size_uploaded = uploaded_len.load(Ordering::SeqCst); + let elapsed = start_time.elapsed(); + + log::info!( + " processed {} in {}, uploaded {}", + HumanByte::from(size), + TimeSpan::from(elapsed), + HumanByte::from(size_uploaded), + ); + } + })) + } else { + None + }; + stream .inject_reused_chunks(injections, stream_len.clone()) .and_then(move |chunk_info| match chunk_info { @@ -776,7 +818,13 @@ impl BackupWriter { Either::Left(h2.send_request(request, upload_data).and_then( move |response| async move { upload_queue - .send((new_info, Some(response))) + .send(( + new_info, + Some(ChunkUploadResponse { + future: response, + size: chunk_info.chunk_len as usize, + }), + )) .await .map_err(|err| { format_err!("failed to send to upload queue: {}", err) @@ -806,6 +854,10 @@ impl BackupWriter { let mut guard = index_csum_2.lock().unwrap(); let csum = guard.take().unwrap().finish(); + if let Some(handle) = progress_handle { + handle.abort(); + } + futures::future::ok(UploadStats { chunk_count, chunk_reused,