diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 2bfb5cf2..1303e850 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -11,6 +11,7 @@ use futures::stream::Stream; use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; +use tokio::sync::Notify; use proxmox_async::blocking::TokioWriterAdapter; use proxmox_io::StdChannelWriter; @@ -31,6 +32,8 @@ pub struct PxarBackupStream { pub suggested_boundaries: Option>, handle: Option, error: Arc>>, + finished: bool, + archiver_finished_notification: Arc, } impl Drop for PxarBackupStream { @@ -80,6 +83,10 @@ impl PxarBackupStream { let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); + let stream_notifier = Arc::new(Notify::new()); + let stream_notification_receiver = stream_notifier.clone(); + let payload_stream_notifier = Arc::new(Notify::new()); + let payload_stream_notification_receiver = payload_stream_notifier.clone(); let handler = async move { if let Err(err) = crate::pxar::create_archive( dir, @@ -101,6 +108,10 @@ impl PxarBackupStream { let mut error = error2.lock().unwrap(); *error = Some(err); } + + // Notify upload streams that archiver is finished (with or without error) + stream_notifier.notify_one(); + payload_stream_notifier.notify_one(); }; let (handle, registration) = AbortHandle::new_pair(); @@ -112,6 +123,8 @@ impl PxarBackupStream { suggested_boundaries: None, handle: Some(handle.clone()), error: Arc::clone(&error), + finished: false, + archiver_finished_notification: stream_notification_receiver, }; let backup_payload_stream = payload_rx.map(|rx| Self { @@ -119,6 +132,8 @@ impl PxarBackupStream { suggested_boundaries: suggested_boundaries_rx, handle: Some(handle), error, + finished: false, + archiver_finished_notification: payload_stream_notification_receiver, }); Ok((backup_stream, backup_payload_stream)) @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream { type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + if this.finished { + // Channel has already been finished and eventual errors propagated, + // early return to avoid blocking on further archiver finished notifications + // by subsequent polls. + return Poll::Ready(None); + } { // limit lock scope - let mut error = self.error.lock().unwrap(); + let mut error = this.error.lock().unwrap(); if let Some(err) = error.take() { return Poll::Ready(Some(Err(err))); } } - match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { + match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) { Ok(data) => Poll::Ready(Some(data)), Err(_) => { - let mut error = self.error.lock().unwrap(); + // Wait for archiver to finish + proxmox_async::runtime::block_on(this.archiver_finished_notification.notified()); + // Never block for archiver finished notification on subsequent calls. + // Eventual error will already have been propagated. + this.finished = true; + + let mut error = this.error.lock().unwrap(); if let Some(err) = error.take() { return Poll::Ready(Some(Err(err))); }