From 1d746a2c02495e768aad4ec94e95d844780365b7 Mon Sep 17 00:00:00 2001 From: Christian Ebner Date: Fri, 11 Oct 2024 11:33:55 +0200 Subject: [PATCH] partial fix #5560: client: periodically show backup progress Spawn a new tokio task which about every minute displays the cumulative progress of the backup for pxar, ppxar or img archive streams. Catalog and metadata archive streams are excluded from the output for better readability, and because the catalog upload lives for the whole upload time, leading to possible temporal misalignments in the output. The actual payload data is written via the other streams anyway. Add accounting for uploaded chunks, to distinguish from chunks queued for upload, but not actually uploaded yet. Example output in the backup task log: ``` ... INFO: processed 2.471 GiB in 1m, uploaded 2.439 GiB INFO: processed 4.963 GiB in 2m, uploaded 4.929 GiB INFO: processed 7.349 GiB in 3m, uploaded 7.284 GiB ... ``` This partially fixes issue 5560: https://bugzilla.proxmox.com/show_bug.cgi?id=5560 Signed-off-by: Christian Ebner Signed-off-by: Thomas Lamprecht --- pbs-client/src/backup_writer.rs | 74 ++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index d63c09b5..4d2e3d08 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -21,6 +21,7 @@ use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1}; use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use proxmox_time::TimeSpan; use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; @@ -65,7 +66,12 @@ struct UploadStats { csum: [u8; 32], } -type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; +struct ChunkUploadResponse { + future: h2::client::ResponseFuture, + size: usize, +} + +type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; type UploadResultReceiver = oneshot::Receiver>; impl BackupWriter { @@ -332,6 +338,12 @@ impl BackupWriter { .as_u64() .unwrap(); + let archive = if log::log_enabled!(log::Level::Debug) { + archive_name + } else { + pbs_tools::format::strip_server_file_extension(archive_name) + }; + let upload_stats = Self::upload_chunk_info_stream( self.h2.clone(), wid, @@ -345,16 +357,12 @@ impl BackupWriter { }, options.compress, injections, + archive, ) .await?; let size_dirty = upload_stats.size - upload_stats.size_reused; let size: HumanByte = upload_stats.size.into(); - let archive = if log::log_enabled!(log::Level::Debug) { - archive_name - } else { - pbs_tools::format::strip_server_file_extension(archive_name) - }; if upload_stats.chunk_injected > 0 { log::info!( @@ -462,6 +470,7 @@ impl BackupWriter { h2: H2Client, wid: u64, path: String, + uploaded: Arc, ) -> (UploadQueueSender, UploadResultReceiver) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64); let (verify_result_tx, verify_result_rx) = oneshot::channel(); @@ -470,15 +479,21 @@ impl BackupWriter { tokio::spawn( ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) - .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { + .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { (Some(response), MergedChunkInfo::Known(list)) => { Either::Left( response + .future .map_err(Error::from) .and_then(H2Client::h2api_response) - .and_then(move |_result| { - future::ok(MergedChunkInfo::Known(list)) + .and_then({ + let uploaded = uploaded.clone(); + move |_result| { + // account for uploaded bytes for progress output + uploaded.fetch_add(response.size, Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(list)) + } }) ) } @@ -636,6 +651,7 @@ impl BackupWriter { crypt_config: Option>, compress: bool, injections: Option>, + archive: &str, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -646,25 +662,51 @@ impl BackupWriter { let stream_len = Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); + let stream_len3 = stream_len.clone(); let compressed_stream_len = Arc::new(AtomicU64::new(0)); let compressed_stream_len2 = compressed_stream_len.clone(); let reused_len = Arc::new(AtomicUsize::new(0)); let reused_len2 = reused_len.clone(); let injected_len = Arc::new(AtomicUsize::new(0)); let injected_len2 = injected_len.clone(); + let uploaded_len = Arc::new(AtomicUsize::new(0)); let append_chunk_path = format!("{}_index", prefix); let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone()); let start_time = std::time::Instant::now(); let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); + let progress_handle = if archive.ends_with(".img") + || archive.ends_with(".pxar") + || archive.ends_with(".ppxar") + { + Some(tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + let size = stream_len3.load(Ordering::SeqCst); + let size_uploaded = uploaded_len.load(Ordering::SeqCst); + let elapsed = start_time.elapsed(); + + log::info!( + " processed {} in {}, uploaded {}", + HumanByte::from(size), + TimeSpan::from(elapsed), + HumanByte::from(size_uploaded), + ); + } + })) + } else { + None + }; + stream .inject_reused_chunks(injections, stream_len.clone()) .and_then(move |chunk_info| match chunk_info { @@ -776,7 +818,13 @@ impl BackupWriter { Either::Left(h2.send_request(request, upload_data).and_then( move |response| async move { upload_queue - .send((new_info, Some(response))) + .send(( + new_info, + Some(ChunkUploadResponse { + future: response, + size: chunk_info.chunk_len as usize, + }), + )) .await .map_err(|err| { format_err!("failed to send to upload queue: {}", err) @@ -806,6 +854,10 @@ impl BackupWriter { let mut guard = index_csum_2.lock().unwrap(); let csum = guard.take().unwrap().finish(); + if let Some(handle) = progress_handle { + handle.abort(); + } + futures::future::ok(UploadStats { chunk_count, chunk_reused,