From 1ec0d70d09142e24840e7996ccc8b2cb7b95b592 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 24 Sep 2021 09:30:00 +0200 Subject: [PATCH] cleanup worker task logging In order to avoid name conflicts with WorkerTaskContext - renamed WorkerTask::log to WorkerTask::log_message Note: Methods have different fuction signatures Also renamed WorkerTask::warn to WorkerTask::log_warning for consistency reasons. Use the task_log!() and task_warn!() macros more often. --- proxmox-rest-server/src/h2service.rs | 11 +++- proxmox-rest-server/src/worker_task.rs | 18 +++--- src/acme/plugin.rs | 6 +- src/api2/admin/datastore.rs | 25 +++++---- src/api2/backup/environment.rs | 6 +- src/api2/config/acme.rs | 13 +++-- src/api2/node/apt.rs | 6 +- src/api2/node/certificates.rs | 47 ++++++++-------- src/api2/node/disks/directory.rs | 3 +- src/api2/node/disks/mod.rs | 3 +- src/api2/node/disks/zfs.rs | 11 ++-- src/api2/node/mod.rs | 8 +-- src/api2/pull.rs | 22 +++++--- src/api2/reader/environment.rs | 4 +- src/api2/tape/drive.rs | 56 +++++++++--------- src/bin/proxmox-backup-proxy.rs | 15 ++--- src/server/gc_job.rs | 5 +- src/server/pull.rs | 78 +++++++++++++------------- src/server/verify_job.rs | 4 +- src/tape/pool_writer/mod.rs | 39 +++++++------ tests/worker-task-abort.rs | 13 +++-- 21 files changed, 210 insertions(+), 183 deletions(-) diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs index f5556d1a..fba9714c 100644 --- a/proxmox-rest-server/src/h2service.rs +++ b/proxmox-rest-server/src/h2service.rs @@ -33,7 +33,7 @@ impl H2Service { } pub fn debug>(&self, msg: S) { - if self.debug { self.worker.log(msg); } + if self.debug { self.worker.log_message(msg); } } fn handle_request(&self, req: Request) -> ApiResponseFuture { @@ -77,7 +77,14 @@ impl H2Service { message = &data.0; } - worker.log(format!("{} {}: {} {}: {}", method.as_str(), path, status.as_str(), reason, message)); + worker.log_message(format!( + "{} {}: {} {}: {}", + method.as_str(), + path, + status.as_str(), + reason, + message + )); } } } diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs index 1b20391a..bea43573 100644 --- a/proxmox-rest-server/src/worker_task.rs +++ b/proxmox-rest-server/src/worker_task.rs @@ -779,7 +779,7 @@ impl WorkerTask { /// Log task result, remove task from running list pub fn log_result(&self, result: &Result<(), Error>) { let state = self.create_state(result); - self.log(state.result_text()); + self.log_message(state.result_text()); WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); let _ = self.setup.update_active_workers(None); @@ -787,13 +787,13 @@ impl WorkerTask { } /// Log a message. - pub fn log>(&self, msg: S) { + pub fn log_message>(&self, msg: S) { let mut data = self.data.lock().unwrap(); data.logger.log(msg); } /// Log a message as warning. - pub fn warn>(&self, msg: S) { + pub fn log_warning>(&self, msg: S) { let mut data = self.data.lock().unwrap(); data.logger.log(format!("WARN: {}", msg.as_ref())); data.warn_count += 1; @@ -815,7 +815,7 @@ impl WorkerTask { let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); if !prev_abort { // log abort one time - self.log(format!("received abort request ...")); + self.log_message(format!("received abort request ...")); } // noitify listeners let mut data = self.data.lock().unwrap(); @@ -867,11 +867,11 @@ impl WorkerTaskContext for WorkerTask { fn log(&self, level: log::Level, message: &std::fmt::Arguments) { match level { - log::Level::Error => self.warn(&message.to_string()), - log::Level::Warn => self.warn(&message.to_string()), - log::Level::Info => self.log(&message.to_string()), - log::Level::Debug => self.log(&format!("DEBUG: {}", message)), - log::Level::Trace => self.log(&format!("TRACE: {}", message)), + log::Level::Error => self.log_warning(&message.to_string()), + log::Level::Warn => self.log_warning(&message.to_string()), + log::Level::Info => self.log_message(&message.to_string()), + log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), + log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), } } } diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs index cb7de082..65eb60d1 100644 --- a/src/acme/plugin.rs +++ b/src/acme/plugin.rs @@ -82,7 +82,7 @@ async fn pipe_to_tasklog( line.clear(); match pipe.read_line(&mut line).await { Ok(0) => return Ok(()), - Ok(_) => task.log(line.as_str()), + Ok(_) => task.log_message(line.as_str()), Err(err) => return Err(err), } } @@ -150,7 +150,7 @@ impl DnsPlugin { Ok(((), (), ())) => (), Err(err) => { if let Err(err) = child.kill().await { - task.log(format!( + task.log_message(format!( "failed to kill '{} {}' command: {}", PROXMOX_ACME_SH_PATH, action, err )); @@ -188,7 +188,7 @@ impl AcmePlugin for DnsPlugin { let validation_delay = self.core.validation_delay.unwrap_or(30) as u64; if validation_delay > 0 { - task.log(format!( + task.log_message(format!( "Sleeping {} seconds to wait for TXT record propagation", validation_delay )); diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 154a2e84..a4152905 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -53,6 +53,7 @@ use pbs_datastore::prune::compute_prune_info; use pbs_tools::blocking::WrappedReaderStream; use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter}; use pbs_tools::json::{required_integer_param, required_string_param}; +use pbs_tools::{task_log, task_warn}; use pbs_config::CachedUserInfo; use proxmox_rest_server::{WorkerTask, formatter}; @@ -770,9 +771,9 @@ pub fn verify( )? }; if !failed_dirs.is_empty() { - worker.log("Failed to verify the following snapshots/groups:"); + task_log!(worker, "Failed to verify the following snapshots/groups:"); for dir in failed_dirs { - worker.log(format!("\t{}", dir)); + task_log!(worker, "\t{}", dir); } bail!("verification failed - please check the log for details"); } @@ -865,11 +866,11 @@ pub fn prune( let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?; if keep_all { - worker.log("No prune selection - keeping all files."); + task_log!(worker, "No prune selection - keeping all files."); } else { - worker.log(format!("retention options: {}", pbs_datastore::prune::cli_options_string(&prune_options))); - worker.log(format!("Starting prune on store \"{}\" group \"{}/{}\"", - store, backup_type, backup_id)); + task_log!(worker, "retention options: {}", pbs_datastore::prune::cli_options_string(&prune_options)); + task_log!(worker, "Starting prune on store \"{}\" group \"{}/{}\"", + store, backup_type, backup_id); } for (info, mut keep) in prune_info { @@ -888,7 +889,7 @@ pub fn prune( if keep { "keep" } else { "remove" }, ); - worker.log(msg); + task_log!(worker, "{}", msg); prune_result.push(json!({ "backup-type": group.backup_type(), @@ -899,11 +900,11 @@ pub fn prune( if !(dry_run || keep) { if let Err(err) = datastore.remove_backup_dir(&info.backup_dir, false) { - worker.warn( - format!( - "failed to remove dir {:?}: {}", - info.backup_dir.relative_path(), err - ) + task_warn!( + worker, + "failed to remove dir {:?}: {}", + info.backup_dir.relative_path(), + err, ); } } diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs index 8112737e..7842df44 100644 --- a/src/api2/backup/environment.rs +++ b/src/api2/backup/environment.rs @@ -528,7 +528,7 @@ impl BackupEnvironment { self.auth_id.to_string(), false, move |worker| { - worker.log("Automatically verifying newly added snapshot"); + worker.log_message("Automatically verifying newly added snapshot"); let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); @@ -548,11 +548,11 @@ impl BackupEnvironment { } pub fn log>(&self, msg: S) { - self.worker.log(msg); + self.worker.log_message(msg); } pub fn debug>(&self, msg: S) { - if self.debug { self.worker.log(msg); } + if self.debug { self.worker.log_message(msg); } } pub fn format_response(&self, result: Result) -> Response { diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs index 8593acac..e8fa0798 100644 --- a/src/api2/config/acme.rs +++ b/src/api2/config/acme.rs @@ -18,6 +18,7 @@ use proxmox_acme_rs::Account; use pbs_api_types::{Authid, PRIV_SYS_MODIFY}; use pbs_tools::ops::ControlFlow; +use pbs_tools::{task_log, task_warn}; use crate::acme::AcmeClient; use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirectory}; @@ -220,15 +221,16 @@ fn register_account( move |worker| async move { let mut client = AcmeClient::new(directory); - worker.log(format!("Registering ACME account '{}'...", &name)); + task_log!(worker, "Registering ACME account '{}'...", &name); let account = do_register_account(&mut client, &name, tos_url.is_some(), contact, None).await?; - worker.log(format!( + task_log!( + worker, "Registration successful, account URL: {}", account.location - )); + ); Ok(()) }, @@ -331,10 +333,11 @@ pub fn deactivate_account( Ok(_account) => (), Err(err) if !force => return Err(err), Err(err) => { - worker.warn(format!( + task_warn!( + worker, "error deactivating account {}, proceedeing anyway - {}", name, err, - )); + ); } } crate::config::acme::mark_account_deactivated(&name)?; diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs index 4fd81592..efc9538b 100644 --- a/src/api2/node/apt.rs +++ b/src/api2/node/apt.rs @@ -89,7 +89,7 @@ fn read_and_update_proxy_config() -> Result, Error> { } fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> { - if !quiet { worker.log("starting apt-get update") } + if !quiet { worker.log_message("starting apt-get update") } read_and_update_proxy_config()?; @@ -101,7 +101,7 @@ fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> { .map_err(|err| format_err!("failed to execute {:?} - {}", command, err))?; if !quiet { - worker.log(String::from_utf8(output.stdout)?); + worker.log_message(String::from_utf8(output.stdout)?); } // TODO: improve run_command to allow outputting both, stderr and stdout @@ -110,7 +110,7 @@ fn do_apt_update(worker: &WorkerTask, quiet: bool) -> Result<(), Error> { let msg = String::from_utf8(output.stderr) .map(|m| if m.is_empty() { String::from("no error message") } else { m }) .unwrap_or_else(|_| String::from("non utf8 error message (suppressed)")); - worker.warn(msg); + worker.log_warning(msg); } else { bail!("terminated by signal"); } diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs index 80733fe9..80419a4f 100644 --- a/src/api2/node/certificates.rs +++ b/src/api2/node/certificates.rs @@ -13,7 +13,7 @@ use proxmox::list_subdirs_api_method; use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY}; use pbs_buildcfg::configdir; -use pbs_tools::cert; +use pbs_tools::{task_log, task_warn, cert}; use crate::acme::AcmeClient; use crate::api2::types::AcmeDomain; @@ -303,7 +303,7 @@ async fn order_certificate( }; if domains.is_empty() { - worker.log("No domains configured to be ordered from an ACME server."); + task_log!(worker, "No domains configured to be ordered from an ACME server."); return Ok(None); } @@ -311,11 +311,11 @@ async fn order_certificate( let mut acme = node_config.acme_client().await?; - worker.log("Placing ACME order"); + task_log!(worker, "Placing ACME order"); let order = acme .new_order(domains.iter().map(|d| d.domain.to_ascii_lowercase())) .await?; - worker.log(format!("Order URL: {}", order.location)); + task_log!(worker, "Order URL: {}", order.location); let identifiers: Vec = order .data @@ -327,7 +327,7 @@ async fn order_certificate( .collect(); for auth_url in &order.data.authorizations { - worker.log(format!("Getting authorization details from '{}'", auth_url)); + task_log!(worker, "Getting authorization details from '{}'", auth_url); let mut auth = acme.get_authorization(&auth_url).await?; let domain = match &mut auth.identifier { @@ -335,11 +335,11 @@ async fn order_certificate( }; if auth.status == Status::Valid { - worker.log(format!("{} is already validated!", domain)); + task_log!(worker, "{} is already validated!", domain); continue; } - worker.log(format!("The validation for {} is pending", domain)); + task_log!(worker, "The validation for {} is pending", domain); let domain_config: &AcmeDomain = get_domain_config(&domain)?; let plugin_id = domain_config.plugin.as_deref().unwrap_or("standalone"); let mut plugin_cfg = @@ -347,7 +347,7 @@ async fn order_certificate( format_err!("plugin '{}' for domain '{}' not found!", plugin_id, domain) })?; - worker.log("Setting up validation plugin"); + task_log!(worker, "Setting up validation plugin"); let validation_url = plugin_cfg .setup(&mut acme, &auth, domain_config, Arc::clone(&worker)) .await?; @@ -358,17 +358,18 @@ async fn order_certificate( .teardown(&mut acme, &auth, domain_config, Arc::clone(&worker)) .await { - worker.warn(format!( + task_warn!( + worker, "Failed to teardown plugin '{}' for domain '{}' - {}", plugin_id, domain, err - )); + ); } let _: () = result?; } - worker.log("All domains validated"); - worker.log("Creating CSR"); + task_log!(worker, "All domains validated"); + task_log!(worker, "Creating CSR"); let csr = proxmox_acme_rs::util::Csr::generate(&identifiers, &Default::default())?; let mut finalize_error_cnt = 0u8; @@ -381,7 +382,7 @@ async fn order_certificate( match order.status { Status::Pending => { - worker.log("still pending, trying to finalize anyway"); + task_log!(worker, "still pending, trying to finalize anyway"); let finalize = order .finalize .as_deref() @@ -396,7 +397,7 @@ async fn order_certificate( tokio::time::sleep(Duration::from_secs(5)).await; } Status::Ready => { - worker.log("order is ready, finalizing"); + task_log!(worker, "order is ready, finalizing"); let finalize = order .finalize .as_deref() @@ -405,18 +406,18 @@ async fn order_certificate( tokio::time::sleep(Duration::from_secs(5)).await; } Status::Processing => { - worker.log("still processing, trying again in 30 seconds"); + task_log!(worker, "still processing, trying again in 30 seconds"); tokio::time::sleep(Duration::from_secs(30)).await; } Status::Valid => { - worker.log("valid"); + task_log!(worker, "valid"); break; } other => bail!("order status: {:?}", other), } } - worker.log("Downloading certificate"); + task_log!(worker, "Downloading certificate"); let certificate = acme .get_certificate( order @@ -438,10 +439,10 @@ async fn request_validation( auth_url: &str, validation_url: &str, ) -> Result<(), Error> { - worker.log("Triggering validation"); + task_log!(worker, "Triggering validation"); acme.request_challenge_validation(&validation_url).await?; - worker.log("Sleeping for 5 seconds"); + task_log!(worker, "Sleeping for 5 seconds"); tokio::time::sleep(Duration::from_secs(5)).await; loop { @@ -450,7 +451,7 @@ async fn request_validation( let auth = acme.get_authorization(&auth_url).await?; match auth.status { Status::Pending => { - worker.log("Status is still 'pending', trying again in 10 seconds"); + task_log!(worker, "Status is still 'pending', trying again in 10 seconds"); tokio::time::sleep(Duration::from_secs(10)).await; } Status::Valid => return Ok(()), @@ -567,11 +568,11 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result, rpcenv: &mut dyn RpcEnvironment) -> Resu let stdout_fut = async move { let mut reader = BufReader::new(stdout).lines(); while let Some(line) = reader.next_line().await? { - worker_stdout.log(line); + worker_stdout.log_message(line); } Ok::<(), Error>(()) }; @@ -192,7 +192,7 @@ async fn termproxy(cmd: Option, rpcenv: &mut dyn RpcEnvironment) -> Resu let stderr_fut = async move { let mut reader = BufReader::new(stderr).lines(); while let Some(line) = reader.next_line().await? { - worker_stderr.warn(line); + worker_stderr.log_warning(line); } Ok::<(), Error>(()) }; @@ -224,9 +224,9 @@ async fn termproxy(cmd: Option, rpcenv: &mut dyn RpcEnvironment) -> Resu } if let Err(err) = child.kill().await { - worker.warn(format!("error killing termproxy: {}", err)); + worker.log_warning(format!("error killing termproxy: {}", err)); } else if let Err(err) = child.wait().await { - worker.warn(format!("error awaiting termproxy: {}", err)); + worker.log_warning(format!("error awaiting termproxy: {}", err)); } } diff --git a/src/api2/pull.rs b/src/api2/pull.rs index 0240098d..8a99a9f6 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -13,11 +13,12 @@ use pbs_api_types::{ DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, }; +use pbs_tools::task_log; use proxmox_rest_server::WorkerTask; +use pbs_config::CachedUserInfo; use crate::server::{jobstate::Job, pull::pull_store}; use crate::backup::DataStore; -use pbs_config::CachedUserInfo; pub fn check_pull_privs( auth_id: &Authid, @@ -97,16 +98,21 @@ pub fn do_sync_job( let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone()); let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?; - worker.log(format!("Starting datastore sync job '{}'", job_id)); + task_log!(worker, "Starting datastore sync job '{}'", job_id); if let Some(event_str) = schedule { - worker.log(format!("task triggered by schedule '{}'", event_str)); + task_log!(worker, "task triggered by schedule '{}'", event_str); } - worker.log(format!("Sync datastore '{}' from '{}/{}'", - sync_job.store, sync_job.remote, sync_job.remote_store)); + task_log!( + worker, + "sync datastore '{}' from '{}/{}'", + sync_job.store, + sync_job.remote, + sync_job.remote_store, + ); pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?; - worker.log(format!("sync job '{}' end", &job_id)); + task_log!(worker, "sync job '{}' end", &job_id); Ok(()) }; @@ -186,7 +192,7 @@ async fn pull ( // fixme: set to_stdout to false? let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { - worker.log(format!("sync datastore '{}' start", store)); + task_log!(worker, "sync datastore '{}' start", store); let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id); let future = select!{ @@ -196,7 +202,7 @@ async fn pull ( let _ = future?; - worker.log(format!("sync datastore '{}' end", store)); + task_log!(worker, "sync datastore '{}' end", store); Ok(()) })?; diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs index c85ec069..329a5771 100644 --- a/src/api2/reader/environment.rs +++ b/src/api2/reader/environment.rs @@ -52,11 +52,11 @@ impl ReaderEnvironment { } pub fn log>(&self, msg: S) { - self.worker.log(msg); + self.worker.log_message(msg); } pub fn debug>(&self, msg: S) { - if self.debug { self.worker.log(msg); } + if self.debug { self.worker.log_message(msg); } } diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs index 8227f659..6564d0fe 100644 --- a/src/api2/tape/drive.rs +++ b/src/api2/tape/drive.rs @@ -35,7 +35,7 @@ use pbs_tape::{ sg_tape::tape_alert_flags_critical, linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device}, }; -use pbs_tools::task_log; +use pbs_tools::{task_log, task_warn}; use proxmox_rest_server::WorkerTask; use crate::{ @@ -548,7 +548,7 @@ fn write_media_label( let media_id = if let Some(ref pool) = pool { // assign media to pool by writing special media set label - worker.log(format!("Label media '{}' for pool '{}'", label.label_text, pool)); + task_log!(worker, "Label media '{}' for pool '{}'", label.label_text, pool); let set = MediaSetLabel::with_data(&pool, [0u8; 16].into(), 0, label.ctime, None); drive.write_media_set_label(&set, None)?; @@ -563,7 +563,7 @@ fn write_media_label( media_id } else { - worker.log(format!("Label media '{}' (no pool assignment)", label.label_text)); + task_log!(worker, "Label media '{}' (no pool assignment)", label.label_text); let media_id = MediaId { label, media_set_label: None }; @@ -771,7 +771,7 @@ pub fn clean_drive( move |worker, config| { let (mut changer, _changer_name) = required_media_changer(&config, &drive)?; - worker.log("Starting drive clean"); + task_log!(worker, "Starting drive clean"); changer.clean_drive()?; @@ -782,7 +782,7 @@ pub fn clean_drive( // test for critical tape alert flags if let Ok(alert_flags) = handle.tape_alert_flags() { if !alert_flags.is_empty() { - worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); + task_log!(worker, "TapeAlertFlags: {:?}", alert_flags); if tape_alert_flags_critical(alert_flags) { bail!("found critical tape alert flags: {:?}", alert_flags); } @@ -791,13 +791,13 @@ pub fn clean_drive( // test wearout (max. 50 mounts) if let Ok(volume_stats) = handle.volume_statistics() { - worker.log(format!("Volume mounts: {}", volume_stats.volume_mounts)); + task_log!(worker, "Volume mounts: {}", volume_stats.volume_mounts); let wearout = volume_stats.volume_mounts * 2; // (*100.0/50.0); - worker.log(format!("Cleaning tape wearout: {}%", wearout)); + task_log!(worker, "Cleaning tape wearout: {}%", wearout); } } - worker.log("Drive cleaned successfully"); + task_log!(worker, "Drive cleaned successfully"); Ok(()) }, @@ -921,7 +921,7 @@ pub fn update_inventory( let label_text_list = changer.online_media_label_texts()?; if label_text_list.is_empty() { - worker.log("changer device does not list any media labels".to_string()); + task_log!(worker, "changer device does not list any media labels"); } let state_path = Path::new(TAPE_STATUS_DIR); @@ -932,36 +932,36 @@ pub fn update_inventory( for label_text in label_text_list.iter() { if label_text.starts_with("CLN") { - worker.log(format!("skip cleaning unit '{}'", label_text)); + task_log!(worker, "skip cleaning unit '{}'", label_text); continue; } let label_text = label_text.to_string(); if !read_all_labels.unwrap_or(false) && inventory.find_media_by_label_text(&label_text).is_some() { - worker.log(format!("media '{}' already inventoried", label_text)); + task_log!(worker, "media '{}' already inventoried", label_text); continue; } if let Err(err) = changer.load_media(&label_text) { - worker.warn(format!("unable to load media '{}' - {}", label_text, err)); + task_warn!(worker, "unable to load media '{}' - {}", label_text, err); continue; } let mut drive = open_drive(&config, &drive)?; match drive.read_label() { Err(err) => { - worker.warn(format!("unable to read label form media '{}' - {}", label_text, err)); + task_warn!(worker, "unable to read label form media '{}' - {}", label_text, err); } Ok((None, _)) => { - worker.log(format!("media '{}' is empty", label_text)); + task_log!(worker, "media '{}' is empty", label_text); } Ok((Some(media_id), _key_config)) => { if label_text != media_id.label.label_text { - worker.warn(format!("label text mismatch ({} != {})", label_text, media_id.label.label_text)); + task_warn!(worker, "label text mismatch ({} != {})", label_text, media_id.label.label_text); continue; } - worker.log(format!("inventorize media '{}' with uuid '{}'", label_text, media_id.label.uuid)); + task_log!(worker, "inventorize media '{}' with uuid '{}'", label_text, media_id.label.uuid); if let Some(MediaSetLabel { ref pool, ref uuid, ..}) = media_id.media_set_label { let _pool_lock = lock_media_pool(state_path, pool)?; @@ -1057,14 +1057,14 @@ fn barcode_label_media_worker( inventory.reload()?; if inventory.find_media_by_label_text(&label_text).is_some() { - worker.log(format!("media '{}' already inventoried (already labeled)", label_text)); + task_log!(worker, "media '{}' already inventoried (already labeled)", label_text); continue; } - worker.log(format!("checking/loading media '{}'", label_text)); + task_log!(worker, "checking/loading media '{}'", label_text); if let Err(err) = changer.load_media(&label_text) { - worker.warn(format!("unable to load media '{}' - {}", label_text, err)); + task_warn!(worker, "unable to load media '{}' - {}", label_text, err); continue; } @@ -1073,13 +1073,13 @@ fn barcode_label_media_worker( match drive.read_next_file() { Ok(_reader) => { - worker.log(format!("media '{}' is not empty (format it first)", label_text)); + task_log!(worker, "media '{}' is not empty (format it first)", label_text); continue; } Err(BlockReadError::EndOfFile) => { /* EOF mark at BOT, assume tape is empty */ }, Err(BlockReadError::EndOfStream) => { /* tape is empty */ }, Err(_err) => { - worker.warn(format!("media '{}' read error (maybe not empty - format it first)", label_text)); + task_warn!(worker, "media '{}' read error (maybe not empty - format it first)", label_text); continue; } } @@ -1249,15 +1249,17 @@ pub fn catalog_media( let media_id = match drive.read_label()? { (Some(media_id), key_config) => { - worker.log(format!( + task_log!( + worker, "found media label: {}", serde_json::to_string_pretty(&serde_json::to_value(&media_id)?)? - )); + ); if key_config.is_some() { - worker.log(format!( + task_log!( + worker, "encryption key config: {}", serde_json::to_string_pretty(&serde_json::to_value(&key_config)?)? - )); + ); } media_id }, @@ -1270,7 +1272,7 @@ pub fn catalog_media( let (_media_set_lock, media_set_uuid) = match media_id.media_set_label { None => { - worker.log("media is empty"); + task_log!(worker, "media is empty"); let _lock = lock_unassigned_media_pool(status_path)?; MediaCatalog::destroy(status_path, &media_id.label.uuid)?; inventory.store(media_id.clone(), false)?; @@ -1278,7 +1280,7 @@ pub fn catalog_media( } Some(ref set) => { if set.uuid.as_ref() == [0u8;16] { // media is empty - worker.log("media is empty"); + task_log!(worker, "media is empty"); let _lock = lock_unassigned_media_pool(status_path)?; MediaCatalog::destroy(status_path, &media_id.label.uuid)?; inventory.store(media_id.clone(), false)?; diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index ec4da15b..ef4dd633 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -20,6 +20,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive; use proxmox::tools::fs::CreateOptions; use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask}; +use pbs_tools::task_log; use proxmox_backup::{ backup::DataStore, @@ -748,16 +749,16 @@ async fn schedule_task_log_rotate() { false, move |worker| { job.start(&worker.upid().to_string())?; - worker.log("starting task log rotation".to_string()); + task_log!(worker, "starting task log rotation"); let result = try_block!({ let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file let max_files = 20; // times twenty files gives > 100000 task entries let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?; if has_rotated { - worker.log("task log archive was rotated".to_string()); + task_log!(worker, "task log archive was rotated"); } else { - worker.log("task log archive was not rotated".to_string()); + task_log!(worker, "task log archive was not rotated"); } let max_size = 32 * 1024 * 1024 - 1; @@ -768,9 +769,9 @@ async fn schedule_task_log_rotate() { if logrotate.rotate(max_size, None, Some(max_files))? { println!("rotated access log, telling daemons to re-open log file"); pbs_runtime::block_on(command_reopen_access_logfiles())?; - worker.log("API access log was rotated".to_string()); + task_log!(worker, "API access log was rotated"); } else { - worker.log("API access log was not rotated".to_string()); + task_log!(worker, "API access log was not rotated"); } let mut logrotate = LogRotate::new(pbs_buildcfg::API_AUTH_LOG_FN, true) @@ -779,9 +780,9 @@ async fn schedule_task_log_rotate() { if logrotate.rotate(max_size, None, Some(max_files))? { println!("rotated auth log, telling daemons to re-open log file"); pbs_runtime::block_on(command_reopen_auth_logfiles())?; - worker.log("API authentication log was rotated".to_string()); + task_log!(worker, "API authentication log was rotated"); } else { - worker.log("API authentication log was not rotated".to_string()); + task_log!(worker, "API authentication log was not rotated"); } Ok(()) diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs index 608b5831..183a00f1 100644 --- a/src/server/gc_job.rs +++ b/src/server/gc_job.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Error; use pbs_api_types::Authid; +use pbs_tools::task_log; use proxmox_rest_server::WorkerTask; use crate::{ @@ -31,9 +32,9 @@ pub fn do_garbage_collection_job( move |worker| { job.start(&worker.upid().to_string())?; - worker.log(format!("starting garbage collection on store {}", store)); + task_log!(worker, "starting garbage collection on store {}", store); if let Some(event_str) = schedule { - worker.log(format!("task triggered by schedule '{}'", event_str)); + task_log!(worker, "task triggered by schedule '{}'", event_str); } let result = datastore.garbage_collection(&*worker, worker.upid()); diff --git a/src/server/pull.rs b/src/server/pull.rs index f913ac8a..cb85383b 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -89,10 +89,10 @@ async fn pull_index_chunks( target.cond_touch_chunk(&info.digest, false) })?; if chunk_exists { - //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); + //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)); return Ok::<_, Error>(()); } - //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); + //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)); let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; let raw_size = chunk.raw_size() as usize; @@ -118,11 +118,12 @@ async fn pull_index_chunks( let bytes = bytes.load(Ordering::SeqCst); - worker.log(format!( + task_log!( + worker, "downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64) / (1024.0 * 1024.0 * elapsed) - )); + ); Ok(()) } @@ -181,7 +182,8 @@ async fn pull_single_archive( let mut tmp_path = path.clone(); tmp_path.set_extension("tmp"); - worker.log(format!("sync archive {}", archive_name)); + task_log!(worker, "sync archive {}", archive_name); + let mut tmpfile = std::fs::OpenOptions::new() .write(true) .create(true) @@ -256,7 +258,7 @@ async fn try_client_log_download( if let Err(err) = std::fs::rename(&tmp_path, &path) { bail!("Atomic rename file {:?} failed - {}", path, err); } - worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); } Ok(()) @@ -287,10 +289,11 @@ async fn pull_snapshot( match err.downcast_ref::() { Some(HttpError { code, message }) => match *code { StatusCode::NOT_FOUND => { - worker.log(format!( + task_log!( + worker, "skipping snapshot {} - vanished since start of sync", snapshot - )); + ); return Ok(()); } _ => { @@ -330,7 +333,7 @@ async fn pull_snapshot( if !client_log_name.exists() { try_client_log_download(worker, reader, &client_log_name).await?; } - worker.log("no data changes"); + task_log!(worker, "no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(()); // nothing changed } @@ -351,7 +354,7 @@ async fn pull_snapshot( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - worker.log(format!("detected changed file {:?} - {}", path, err)); + task_log!(worker, "detected changed file {:?} - {}", path, err); } } } @@ -361,7 +364,7 @@ async fn pull_snapshot( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - worker.log(format!("detected changed file {:?} - {}", path, err)); + task_log!(worker, "detected changed file {:?} - {}", path, err); } } } @@ -371,7 +374,7 @@ async fn pull_snapshot( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - worker.log(format!("detected changed file {:?} - {}", path, err)); + task_log!(worker, "detected changed file {:?} - {}", path, err); } } } @@ -421,7 +424,7 @@ pub async fn pull_snapshot_from( let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; if is_new { - worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); + task_log!(worker, "sync snapshot {:?}", snapshot.relative_path()); if let Err(err) = pull_snapshot( worker, @@ -433,13 +436,13 @@ pub async fn pull_snapshot_from( .await { if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { - worker.log(format!("cleanup error - {}", cleanup_err)); + task_log!(worker, "cleanup error - {}", cleanup_err); } return Err(err); } - worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); + task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path()); } else { - worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); + task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path()); pull_snapshot( worker, reader, @@ -448,10 +451,7 @@ pub async fn pull_snapshot_from( downloaded_chunks, ) .await?; - worker.log(format!( - "re-sync snapshot {:?} done", - snapshot.relative_path() - )); + task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path()); } Ok(()) @@ -547,10 +547,7 @@ pub async fn pull_group( // in-progress backups can't be synced if item.size.is_none() { - worker.log(format!( - "skipping snapshot {} - in-progress backup", - snapshot - )); + task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot); continue; } @@ -598,7 +595,7 @@ pub async fn pull_group( .await; progress.done_snapshots = pos as u64 + 1; - worker.log(format!("percentage done: {}", progress)); + task_log!(worker, "percentage done: {}", progress); result?; // stop on error } @@ -610,10 +607,7 @@ pub async fn pull_group( if remote_snapshots.contains(&backup_time) { continue; } - worker.log(format!( - "delete vanished snapshot {:?}", - info.backup_dir.relative_path() - )); + task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path()); tgt_store.remove_backup_dir(&info.backup_dir, false)?; } } @@ -645,7 +639,7 @@ pub async fn pull_store( let mut list: Vec = serde_json::from_value(result["data"].take())?; - worker.log(format!("found {} groups to sync", list.len())); + task_log!(worker, "found {} groups to sync", list.len()); list.sort_unstable_by(|a, b| { let type_order = a.backup_type.cmp(&b.backup_type); @@ -675,10 +669,11 @@ pub async fn pull_store( let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { Ok(result) => result, Err(err) => { - worker.log(format!( + task_log!( + worker, "sync group {}/{} failed - group lock failed: {}", item.backup_type, item.backup_id, err - )); + ); errors = true; // do not stop here, instead continue continue; } @@ -687,10 +682,11 @@ pub async fn pull_store( // permission check if auth_id != owner { // only the owner is allowed to create additional snapshots - worker.log(format!( + task_log!( + worker, "sync group {}/{} failed - owner check failed ({} != {})", item.backup_type, item.backup_id, auth_id, owner - )); + ); errors = true; // do not stop here, instead continue } else if let Err(err) = pull_group( worker, @@ -703,10 +699,11 @@ pub async fn pull_store( ) .await { - worker.log(format!( + task_log!( + worker, "sync group {}/{} failed - {}", item.backup_type, item.backup_id, err, - )); + ); errors = true; // do not stop here, instead continue } } @@ -718,20 +715,21 @@ pub async fn pull_store( if new_groups.contains(&local_group) { continue; } - worker.log(format!( + task_log!( + worker, "delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id() - )); + ); if let Err(err) = tgt_store.remove_backup_group(&local_group) { - worker.log(err.to_string()); + task_log!(worker, "{}", err.to_string()); errors = true; } } Ok(()) }); if let Err(err) = result { - worker.log(format!("error during cleanup: {}", err)); + task_log!(worker, "error during cleanup: {}", err); errors = true; }; } diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs index 62fa6fa8..e90dcd4d 100644 --- a/src/server/verify_job.rs +++ b/src/server/verify_job.rs @@ -58,9 +58,9 @@ pub fn do_verification_job( let job_result = match result { Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()), Ok(ref failed_dirs) => { - worker.log("Failed to verify the following snapshots/groups:"); + task_log!(worker, "Failed to verify the following snapshots/groups:"); for dir in failed_dirs { - worker.log(format!("\t{}", dir)); + task_log!(worker, "\t{}", dir); } Err(format_err!("verification failed - please check the log for details")) diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs index 2984173f..2f7de2f5 100644 --- a/src/tape/pool_writer/mod.rs +++ b/src/tape/pool_writer/mod.rs @@ -13,7 +13,7 @@ use anyhow::{bail, Error}; use proxmox::tools::Uuid; -use pbs_tools::task_log; +use pbs_tools::{task_log, task_warn}; use pbs_config::tape_encryption_keys::load_key_configs; use pbs_tape::{ TapeWrite, @@ -135,13 +135,13 @@ impl PoolWriter { let (drive_config, _digest) = pbs_config::drive::config()?; if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { - worker.log("eject media"); + task_log!(worker, "eject media"); status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster drop(status); // close drive - worker.log("unload media"); + task_log!(worker, "unload media"); changer.unload_media(None)?; //eject and unload } else { - worker.log("standalone drive - ejecting media"); + task_log!(worker, "standalone drive - ejecting media"); status.drive.eject_media()?; } @@ -157,26 +157,26 @@ impl PoolWriter { if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { if let Some(ref mut status) = status { - worker.log("eject media"); + task_log!(worker, "eject media"); status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster } drop(status); // close drive - worker.log("unload media"); + task_log!(worker, "unload media"); changer.unload_media(None)?; for media_uuid in self.pool.current_media_list()? { let media = self.pool.lookup_media(media_uuid)?; let label_text = media.label_text(); if let Some(slot) = changer.export_media(label_text)? { - worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot)); + task_log!(worker, "exported media '{}' to import/export slot {}", label_text, slot); } else { - worker.warn(format!("export failed - media '{}' is not online", label_text)); + task_warn!(worker, "export failed - media '{}' is not online", label_text); } } } else if let Some(mut status) = status { - worker.log("standalone drive - ejecting media instead of export"); + task_log!(worker, "standalone drive - ejecting media instead of export"); status.drive.eject_media()?; } @@ -233,7 +233,7 @@ impl PoolWriter { // test for critical tape alert flags if let Ok(alert_flags) = drive.tape_alert_flags() { if !alert_flags.is_empty() { - worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); + task_log!(worker, "TapeAlertFlags: {:?}", alert_flags); if tape_alert_flags_critical(alert_flags) { self.pool.set_media_status_damaged(&media_uuid)?; bail!("aborting due to critical tape alert flags: {:?}", alert_flags); @@ -297,7 +297,7 @@ impl PoolWriter { ) -> Result { if !status.at_eom { - worker.log(String::from("moving to end of media")); + task_log!(worker, "moving to end of media"); status.drive.move_to_eom(true)?; status.at_eom = true; } @@ -499,12 +499,13 @@ impl PoolWriter { status.bytes_written += bytes_written; let elapsed = start_time.elapsed()?.as_secs_f64(); - worker.log(format!( + task_log!( + worker, "wrote {} chunks ({:.2} MB at {:.2} MB/s)", saved_chunks.len(), bytes_written as f64 /1_000_000.0, (bytes_written as f64)/(1_000_000.0*elapsed), - )); + ); let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; @@ -571,7 +572,7 @@ fn write_chunk_archive<'a>( } if writer.bytes_written() > max_size { - //worker.log("Chunk Archive max size reached, closing archive".to_string()); + //task_log!(worker, "Chunk Archive max size reached, closing archive"); break; } } @@ -614,7 +615,7 @@ fn update_media_set_label( let new_media = match old_set { None => { - worker.log("writing new media set label".to_string()); + task_log!(worker, "writing new media set label"); drive.write_media_set_label(new_set, key_config.as_ref())?; media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; true @@ -634,9 +635,11 @@ fn update_media_set_label( false } else { - worker.log( - format!("writing new media set label (overwrite '{}/{}')", - media_set_label.uuid.to_string(), media_set_label.seq_nr) + task_log!( + worker, + "writing new media set label (overwrite '{}/{}')", + media_set_label.uuid.to_string(), + media_set_label.seq_nr, ); drive.write_media_set_label(new_set, key_config.as_ref())?; diff --git a/tests/worker-task-abort.rs b/tests/worker-task-abort.rs index 2481d984..de7c25e3 100644 --- a/tests/worker-task-abort.rs +++ b/tests/worker-task-abort.rs @@ -9,22 +9,23 @@ use proxmox::try_block; use proxmox::tools::fs::CreateOptions; use pbs_api_types::{Authid, UPID}; +use pbs_tools::task_log; -use proxmox_rest_server::{flog, CommandoSocket, WorkerTask}; +use proxmox_rest_server::{CommandoSocket, WorkerTask}; fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> { - worker.log("start garbage collection"); + task_log!(worker, "start garbage collection"); for i in 0..50 { worker.check_abort()?; - flog!(worker, "progress {}", i); + task_log!(worker, "progress {}", i); std::thread::sleep(std::time::Duration::from_millis(10)); } - worker.log("end garbage collection"); + task_log!(worker, "end garbage collection"); Ok(()) } @@ -35,10 +36,10 @@ fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> { fn worker_task_abort() -> Result<(), Error> { let uid = nix::unistd::Uid::current(); let gid = nix::unistd::Gid::current(); - + let file_opts = CreateOptions::new().owner(uid).group(gid); proxmox_rest_server::init_worker_tasks("./target/tasklogtestdir".into(), file_opts.clone())?; - + use std::sync::{Arc, Mutex}; let errmsg: Arc>> = Arc::new(Mutex::new(None));