api: switch from task_log! macro to tracing

Import `proxmox-log` and substitute all `task_log!`
(and task_warn!, task_error!) invocations with tracing calls (info!,
warn!, etc..). Remove worker references where it isn't necessary
anymore.

Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Lukas Wagner <l.wagner@proxmox.com>
Signed-off-by: Gabriel Goller <g.goller@proxmox.com>
This commit is contained in:
Gabriel Goller 2024-07-09 16:20:13 +02:00 committed by Wolfgang Bumiller
parent 7e2486e800
commit 3b2ade778f
14 changed files with 256 additions and 479 deletions

View File

@ -13,6 +13,7 @@ use hyper::{header, Body, Response, StatusCode};
use serde::Deserialize; use serde::Deserialize;
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, warn};
use proxmox_async::blocking::WrappedReaderStream; use proxmox_async::blocking::WrappedReaderStream;
use proxmox_async::{io::AsyncChannelWriter, stream::AsyncReaderStream}; use proxmox_async::{io::AsyncChannelWriter, stream::AsyncReaderStream};
@ -26,7 +27,6 @@ use proxmox_sortable_macro::sortable;
use proxmox_sys::fs::{ use proxmox_sys::fs::{
file_read_firstline, file_read_optional_string, replace_file, CreateOptions, file_read_firstline, file_read_optional_string, replace_file, CreateOptions,
}; };
use proxmox_sys::{task_log, task_warn};
use proxmox_time::CalendarEvent; use proxmox_time::CalendarEvent;
use pxar::accessor::aio::Accessor; use pxar::accessor::aio::Accessor;
@ -909,9 +909,9 @@ pub fn verify(
)? )?
}; };
if !failed_dirs.is_empty() { if !failed_dirs.is_empty() {
task_log!(worker, "Failed to verify the following snapshots/groups:"); info!("Failed to verify the following snapshots/groups:");
for dir in failed_dirs { for dir in failed_dirs {
task_log!(worker, "\t{}", dir); info!("\t{dir}");
} }
bail!("verification failed - please check the log for details"); bail!("verification failed - please check the log for details");
} }
@ -1031,9 +1031,9 @@ pub fn prune(
return Ok(json!(prune_result)); return Ok(json!(prune_result));
} }
let prune_group = move |worker: Arc<WorkerTask>| { let prune_group = move |_worker: Arc<WorkerTask>| {
if keep_all { if keep_all {
task_log!(worker, "No prune selection - keeping all files."); info!("No prune selection - keeping all files.");
} else { } else {
let mut opts = Vec::new(); let mut opts = Vec::new();
if !ns.is_root() { if !ns.is_root() {
@ -1041,9 +1041,8 @@ pub fn prune(
} }
crate::server::cli_keep_options(&mut opts, &keep_options); crate::server::cli_keep_options(&mut opts, &keep_options);
task_log!(worker, "retention options: {}", opts.join(" ")); info!("retention options: {}", opts.join(" "));
task_log!( info!(
worker,
"Starting prune on {} group \"{}\"", "Starting prune on {} group \"{}\"",
print_store_and_ns(&store, &ns), print_store_and_ns(&store, &ns),
group.group(), group.group(),
@ -1060,7 +1059,7 @@ pub fn prune(
let msg = format!("{}/{}/{timestamp} {mark}", group.ty, group.id); let msg = format!("{}/{}/{timestamp} {mark}", group.ty, group.id);
task_log!(worker, "{msg}"); info!("{msg}");
prune_result.push(PruneResult { prune_result.push(PruneResult {
backup_type: group.ty, backup_type: group.ty,
@ -1073,8 +1072,7 @@ pub fn prune(
if !keep { if !keep {
if let Err(err) = backup_dir.destroy(false) { if let Err(err) = backup_dir.destroy(false) {
task_warn!( warn!(
worker,
"failed to remove dir {:?}: {}", "failed to remove dir {:?}: {}",
backup_dir.relative_path(), backup_dir.relative_path(),
err, err,
@ -1098,7 +1096,7 @@ pub fn prune(
)?; )?;
Ok(json!(upid)) Ok(json!(upid))
} else { } else {
let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?; let (worker, _) = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
let result = prune_group(worker.clone()); let result = prune_group(worker.clone());
worker.log_result(&Ok(())); worker.log_result(&Ok(()));
Ok(json!(result)) Ok(json!(result))
@ -1161,9 +1159,7 @@ pub fn prune_datastore(
Some(worker_id), Some(worker_id),
auth_id.to_string(), auth_id.to_string(),
to_stdout, to_stdout,
move |worker| { move |_worker| crate::server::prune_datastore(auth_id, prune_options, datastore, dry_run),
crate::server::prune_datastore(worker, auth_id, prune_options, datastore, dry_run)
},
)?; )?;
Ok(upid_str) Ok(upid_str)

View File

@ -9,12 +9,12 @@ use hex::FromHex;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tracing::{info, warn};
use proxmox_router::{ use proxmox_router::{
http_bail, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap, http_bail, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap,
}; };
use proxmox_schema::{api, param_bail}; use proxmox_schema::{api, param_bail};
use proxmox_sys::{task_log, task_warn};
use proxmox_acme::types::AccountData as AcmeAccountData; use proxmox_acme::types::AccountData as AcmeAccountData;
use proxmox_acme::Account; use proxmox_acme::Account;
@ -240,10 +240,10 @@ fn register_account(
Some(name.to_string()), Some(name.to_string()),
auth_id.to_string(), auth_id.to_string(),
true, true,
move |worker| async move { move |_worker| async move {
let mut client = AcmeClient::new(directory); let mut client = AcmeClient::new(directory);
task_log!(worker, "Registering ACME account '{}'...", &name); info!("Registering ACME account '{}'...", &name);
let account = do_register_account( let account = do_register_account(
&mut client, &mut client,
@ -255,11 +255,7 @@ fn register_account(
) )
.await?; .await?;
task_log!( info!("Registration successful, account URL: {}", account.location);
worker,
"Registration successful, account URL: {}",
account.location
);
Ok(()) Ok(())
}, },
@ -354,7 +350,7 @@ pub fn deactivate_account(
Some(name.to_string()), Some(name.to_string()),
auth_id.to_string(), auth_id.to_string(),
true, true,
move |worker| async move { move |_worker| async move {
match AcmeClient::load(&name) match AcmeClient::load(&name)
.await? .await?
.update_account(&json!({"status": "deactivated"})) .update_account(&json!({"status": "deactivated"}))
@ -363,12 +359,7 @@ pub fn deactivate_account(
Ok(_account) => (), Ok(_account) => (),
Err(err) if !force => return Err(err), Err(err) if !force => return Err(err),
Err(err) => { Err(err) => {
task_warn!( warn!("error deactivating account {name}, proceeding anyway - {err}");
worker,
"error deactivating account {}, proceedeing anyway - {}",
name,
err,
);
} }
} }
crate::config::acme::mark_account_deactivated(&name)?; crate::config::acme::mark_account_deactivated(&name)?;

View File

@ -4,11 +4,11 @@ use ::serde::{Deserialize, Serialize};
use anyhow::Error; use anyhow::Error;
use hex::FromHex; use hex::FromHex;
use serde_json::Value; use serde_json::Value;
use tracing::warn;
use proxmox_router::{http_bail, Permission, Router, RpcEnvironment, RpcEnvironmentType}; use proxmox_router::{http_bail, Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::{api, param_bail, ApiType}; use proxmox_schema::{api, param_bail, ApiType};
use proxmox_section_config::SectionConfigData; use proxmox_section_config::SectionConfigData;
use proxmox_sys::{task_warn, WorkerTaskContext};
use proxmox_uuid::Uuid; use proxmox_uuid::Uuid;
use pbs_api_types::{ use pbs_api_types::{
@ -70,7 +70,6 @@ pub(crate) fn do_create_datastore(
_lock: BackupLockGuard, _lock: BackupLockGuard,
mut config: SectionConfigData, mut config: SectionConfigData,
datastore: DataStoreConfig, datastore: DataStoreConfig,
worker: Option<&dyn WorkerTaskContext>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let path: PathBuf = datastore.path.clone().into(); let path: PathBuf = datastore.path.clone().into();
@ -84,7 +83,6 @@ pub(crate) fn do_create_datastore(
path, path,
backup_user.uid, backup_user.uid,
backup_user.gid, backup_user.gid,
worker,
tuning.sync_level.unwrap_or_default(), tuning.sync_level.unwrap_or_default(),
)?; )?;
@ -155,11 +153,11 @@ pub fn create_datastore(
Some(config.name.to_string()), Some(config.name.to_string()),
auth_id.to_string(), auth_id.to_string(),
to_stdout, to_stdout,
move |worker| { move |_worker| {
do_create_datastore(lock, section_config, config, Some(&worker))?; do_create_datastore(lock, section_config, config)?;
if let Some(prune_job_config) = prune_job_config { if let Some(prune_job_config) = prune_job_config {
do_create_prune_job(prune_job_config, Some(&worker)) do_create_prune_job(prune_job_config)
} else { } else {
Ok(()) Ok(())
} }
@ -528,8 +526,8 @@ pub async fn delete_datastore(
Some(name.clone()), Some(name.clone()),
auth_id.to_string(), auth_id.to_string(),
to_stdout, to_stdout,
move |worker| { move |_worker| {
pbs_datastore::DataStore::destroy(&name, destroy_data, &worker)?; pbs_datastore::DataStore::destroy(&name, destroy_data)?;
// ignore errors // ignore errors
let _ = jobstate::remove_state_file("prune", &name); let _ = jobstate::remove_state_file("prune", &name);
@ -538,7 +536,7 @@ pub async fn delete_datastore(
if let Err(err) = if let Err(err) =
proxmox_async::runtime::block_on(crate::server::notify_datastore_removed()) proxmox_async::runtime::block_on(crate::server::notify_datastore_removed())
{ {
task_warn!(worker, "failed to notify after datastore removal: {err}"); warn!("failed to notify after datastore removal: {err}");
} }
Ok(()) Ok(())

View File

@ -1,7 +1,5 @@
use anyhow::Error; use anyhow::Error;
use hex::FromHex; use hex::FromHex;
use proxmox_sys::task_log;
use proxmox_sys::WorkerTaskContext;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@ -15,6 +13,7 @@ use pbs_api_types::{
use pbs_config::prune; use pbs_config::prune;
use pbs_config::CachedUserInfo; use pbs_config::CachedUserInfo;
use tracing::info;
#[api( #[api(
input: { input: {
@ -58,10 +57,7 @@ pub fn list_prune_jobs(
Ok(list) Ok(list)
} }
pub fn do_create_prune_job( pub fn do_create_prune_job(config: PruneJobConfig) -> Result<(), Error> {
config: PruneJobConfig,
worker: Option<&dyn WorkerTaskContext>,
) -> Result<(), Error> {
let _lock = prune::lock_config()?; let _lock = prune::lock_config()?;
let (mut section_config, _digest) = prune::config()?; let (mut section_config, _digest) = prune::config()?;
@ -76,9 +72,7 @@ pub fn do_create_prune_job(
crate::server::jobstate::create_state_file("prunejob", &config.id)?; crate::server::jobstate::create_state_file("prunejob", &config.id)?;
if let Some(worker) = worker { info!("Prune job created: {}", config.id);
task_log!(worker, "Prune job created: {}", config.id);
}
Ok(()) Ok(())
} }
@ -108,7 +102,7 @@ pub fn create_prune_job(
user_info.check_privs(&auth_id, &config.acl_path(), PRIV_DATASTORE_MODIFY, true)?; user_info.check_privs(&auth_id, &config.acl_path(), PRIV_DATASTORE_MODIFY, true)?;
do_create_prune_job(config, None) do_create_prune_job(config)
} }
#[api( #[api(

View File

@ -1,4 +1,5 @@
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use tracing::{info, warn};
use proxmox_config_digest::ConfigDigest; use proxmox_config_digest::ConfigDigest;
use proxmox_router::{ use proxmox_router::{

View File

@ -5,16 +5,17 @@ use anyhow::{bail, format_err, Error};
use openssl::pkey::PKey; use openssl::pkey::PKey;
use openssl::x509::X509; use openssl::x509::X509;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::info;
use proxmox_router::list_subdirs_api_method; use proxmox_router::list_subdirs_api_method;
use proxmox_router::SubdirMap; use proxmox_router::SubdirMap;
use proxmox_router::{Permission, Router, RpcEnvironment}; use proxmox_router::{Permission, Router, RpcEnvironment};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::{task_log, task_warn};
use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY}; use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
use pbs_buildcfg::configdir; use pbs_buildcfg::configdir;
use pbs_tools::cert; use pbs_tools::cert;
use tracing::warn;
use crate::acme::AcmeClient; use crate::acme::AcmeClient;
use crate::api2::types::AcmeDomain; use crate::api2::types::AcmeDomain;
@ -301,10 +302,7 @@ async fn order_certificate(
}; };
if domains.is_empty() { if domains.is_empty() {
task_log!( info!("No domains configured to be ordered from an ACME server.");
worker,
"No domains configured to be ordered from an ACME server."
);
return Ok(None); return Ok(None);
} }
@ -312,11 +310,11 @@ async fn order_certificate(
let mut acme = node_config.acme_client().await?; let mut acme = node_config.acme_client().await?;
task_log!(worker, "Placing ACME order"); info!("Placing ACME order");
let order = acme let order = acme
.new_order(domains.iter().map(|d| d.domain.to_ascii_lowercase())) .new_order(domains.iter().map(|d| d.domain.to_ascii_lowercase()))
.await?; .await?;
task_log!(worker, "Order URL: {}", order.location); info!("Order URL: {}", order.location);
let identifiers: Vec<String> = order let identifiers: Vec<String> = order
.data .data
@ -328,7 +326,7 @@ async fn order_certificate(
.collect(); .collect();
for auth_url in &order.data.authorizations { for auth_url in &order.data.authorizations {
task_log!(worker, "Getting authorization details from '{}'", auth_url); info!("Getting authorization details from '{auth_url}'");
let mut auth = acme.get_authorization(auth_url).await?; let mut auth = acme.get_authorization(auth_url).await?;
let domain = match &mut auth.identifier { let domain = match &mut auth.identifier {
@ -336,43 +334,37 @@ async fn order_certificate(
}; };
if auth.status == Status::Valid { if auth.status == Status::Valid {
task_log!(worker, "{} is already validated!", domain); info!("{domain} is already validated!");
continue; continue;
} }
task_log!(worker, "The validation for {} is pending", domain); info!("The validation for {domain} is pending");
let domain_config: &AcmeDomain = get_domain_config(&domain)?; let domain_config: &AcmeDomain = get_domain_config(&domain)?;
let plugin_id = domain_config.plugin.as_deref().unwrap_or("standalone"); let plugin_id = domain_config.plugin.as_deref().unwrap_or("standalone");
let mut plugin_cfg = let mut plugin_cfg =
crate::acme::get_acme_plugin(&plugins, plugin_id)?.ok_or_else(|| { crate::acme::get_acme_plugin(&plugins, plugin_id)?.ok_or_else(|| {
format_err!("plugin '{}' for domain '{}' not found!", plugin_id, domain) format_err!("plugin '{plugin_id}' for domain '{domain}' not found!")
})?; })?;
task_log!(worker, "Setting up validation plugin"); info!("Setting up validation plugin");
let validation_url = plugin_cfg let validation_url = plugin_cfg
.setup(&mut acme, &auth, domain_config, Arc::clone(&worker)) .setup(&mut acme, &auth, domain_config, Arc::clone(&worker))
.await?; .await?;
let result = request_validation(&worker, &mut acme, auth_url, validation_url).await; let result = request_validation(&mut acme, auth_url, validation_url).await;
if let Err(err) = plugin_cfg if let Err(err) = plugin_cfg
.teardown(&mut acme, &auth, domain_config, Arc::clone(&worker)) .teardown(&mut acme, &auth, domain_config, Arc::clone(&worker))
.await .await
{ {
task_warn!( warn!("Failed to teardown plugin '{plugin_id}' for domain '{domain}' - {err}");
worker,
"Failed to teardown plugin '{}' for domain '{}' - {}",
plugin_id,
domain,
err
);
} }
result?; result?;
} }
task_log!(worker, "All domains validated"); info!("All domains validated");
task_log!(worker, "Creating CSR"); info!("Creating CSR");
let csr = proxmox_acme::util::Csr::generate(&identifiers, &Default::default())?; let csr = proxmox_acme::util::Csr::generate(&identifiers, &Default::default())?;
let mut finalize_error_cnt = 0u8; let mut finalize_error_cnt = 0u8;
@ -385,7 +377,7 @@ async fn order_certificate(
match order.status { match order.status {
Status::Pending => { Status::Pending => {
task_log!(worker, "still pending, trying to finalize anyway"); info!("still pending, trying to finalize anyway");
let finalize = order let finalize = order
.finalize .finalize
.as_deref() .as_deref()
@ -400,7 +392,7 @@ async fn order_certificate(
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Status::Ready => { Status::Ready => {
task_log!(worker, "order is ready, finalizing"); info!("order is ready, finalizing");
let finalize = order let finalize = order
.finalize .finalize
.as_deref() .as_deref()
@ -409,18 +401,18 @@ async fn order_certificate(
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Status::Processing => { Status::Processing => {
task_log!(worker, "still processing, trying again in 30 seconds"); info!("still processing, trying again in 30 seconds");
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
} }
Status::Valid => { Status::Valid => {
task_log!(worker, "valid"); info!("valid");
break; break;
} }
other => bail!("order status: {:?}", other), other => bail!("order status: {:?}", other),
} }
} }
task_log!(worker, "Downloading certificate"); info!("Downloading certificate");
let certificate = acme let certificate = acme
.get_certificate( .get_certificate(
order order
@ -437,15 +429,14 @@ async fn order_certificate(
} }
async fn request_validation( async fn request_validation(
worker: &WorkerTask,
acme: &mut AcmeClient, acme: &mut AcmeClient,
auth_url: &str, auth_url: &str,
validation_url: &str, validation_url: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
task_log!(worker, "Triggering validation"); info!("Triggering validation");
acme.request_challenge_validation(validation_url).await?; acme.request_challenge_validation(validation_url).await?;
task_log!(worker, "Sleeping for 5 seconds"); info!("Sleeping for 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
loop { loop {
@ -454,10 +445,7 @@ async fn request_validation(
let auth = acme.get_authorization(auth_url).await?; let auth = acme.get_authorization(auth_url).await?;
match auth.status { match auth.status {
Status::Pending => { Status::Pending => {
task_log!( info!("Status is still 'pending', trying again in 10 seconds");
worker,
"Status is still 'pending', trying again in 10 seconds"
);
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
} }
Status::Valid => return Ok(()), Status::Valid => return Ok(()),
@ -582,15 +570,12 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
None, None,
auth_id, auth_id,
true, true,
move |worker| async move { move |_worker| async move {
task_log!(worker, "Loading ACME account"); info!("Loading ACME account");
let mut acme = node_config.acme_client().await?; let mut acme = node_config.acme_client().await?;
task_log!(worker, "Revoking old certificate"); info!("Revoking old certificate");
acme.revoke_certificate(cert_pem.as_bytes(), None).await?; acme.revoke_certificate(cert_pem.as_bytes(), None).await?;
task_log!( info!("Deleting certificate and regenerating a self-signed one");
worker,
"Deleting certificate and regenerating a self-signed one"
);
delete_custom_certificate().await?; delete_custom_certificate().await?;
Ok(()) Ok(())
}, },

View File

@ -2,11 +2,11 @@ use ::serde::{Deserialize, Serialize};
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use serde_json::json; use serde_json::json;
use std::os::linux::fs::MetadataExt; use std::os::linux::fs::MetadataExt;
use tracing::info;
use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType}; use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_section_config::SectionConfigData; use proxmox_section_config::SectionConfigData;
use proxmox_sys::task_log;
use pbs_api_types::{ use pbs_api_types::{
DataStoreConfig, BLOCKDEVICE_NAME_SCHEMA, DATASTORE_SCHEMA, NODE_SCHEMA, PRIV_SYS_AUDIT, DataStoreConfig, BLOCKDEVICE_NAME_SCHEMA, DATASTORE_SCHEMA, NODE_SCHEMA, PRIV_SYS_AUDIT,
@ -179,8 +179,8 @@ pub fn create_datastore_disk(
Some(name.clone()), Some(name.clone()),
auth_id, auth_id,
to_stdout, to_stdout,
move |worker| { move |_worker| {
task_log!(worker, "create datastore '{}' on disk {}", name, disk); info!("create datastore '{name}' on disk {disk}");
let add_datastore = add_datastore.unwrap_or(false); let add_datastore = add_datastore.unwrap_or(false);
let filesystem = filesystem.unwrap_or(FileSystemType::Ext4); let filesystem = filesystem.unwrap_or(FileSystemType::Ext4);
@ -213,12 +213,7 @@ pub fn create_datastore_disk(
bail!("datastore '{}' already exists.", datastore.name); bail!("datastore '{}' already exists.", datastore.name);
} }
crate::api2::config::datastore::do_create_datastore( crate::api2::config::datastore::do_create_datastore(lock, config, datastore)?;
lock,
config,
datastore,
Some(&worker),
)?;
} }
Ok(()) Ok(())

View File

@ -6,7 +6,7 @@ use proxmox_router::{
}; };
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sortable_macro::sortable; use proxmox_sortable_macro::sortable;
use proxmox_sys::task_log; use tracing::info;
use pbs_api_types::{ use pbs_api_types::{
BLOCKDEVICE_DISK_AND_PARTITION_NAME_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_DISK_AND_PARTITION_NAME_SCHEMA, BLOCKDEVICE_NAME_SCHEMA, NODE_SCHEMA,
@ -164,8 +164,8 @@ pub fn initialize_disk(
Some(disk.clone()), Some(disk.clone()),
auth_id, auth_id,
to_stdout, to_stdout,
move |worker| { move |_worker| {
task_log!(worker, "initialize disk {}", disk); info!("initialize disk {disk}");
let disk_manager = DiskManage::new(); let disk_manager = DiskManage::new();
let disk_info = disk_manager.disk_by_name(&disk)?; let disk_info = disk_manager.disk_by_name(&disk)?;
@ -209,13 +209,13 @@ pub fn wipe_disk(disk: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value,
Some(disk.clone()), Some(disk.clone()),
auth_id, auth_id,
to_stdout, to_stdout,
move |worker| { move |_worker| {
task_log!(worker, "wipe disk {}", disk); info!("wipe disk {disk}");
let disk_manager = DiskManage::new(); let disk_manager = DiskManage::new();
let disk_info = disk_manager.partition_by_name(&disk)?; let disk_info = disk_manager.partition_by_name(&disk)?;
wipe_blockdev(&disk_info, worker)?; wipe_blockdev(&disk_info)?;
Ok(()) Ok(())
}, },

View File

@ -1,9 +1,9 @@
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tracing::{error, info};
use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType}; use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::{task_error, task_log};
use pbs_api_types::{ use pbs_api_types::{
DataStoreConfig, ZfsCompressionType, ZfsRaidLevel, ZpoolListItem, DATASTORE_SCHEMA, DataStoreConfig, ZfsCompressionType, ZfsRaidLevel, ZpoolListItem, DATASTORE_SCHEMA,
@ -228,14 +228,8 @@ pub fn create_zpool(
Some(name.clone()), Some(name.clone()),
auth_id, auth_id,
to_stdout, to_stdout,
move |worker| { move |_worker| {
task_log!( info!("create {raidlevel:?} zpool '{name}' on devices '{devices_text}'");
worker,
"create {:?} zpool '{}' on devices '{}'",
raidlevel,
name,
devices_text
);
let mut command = std::process::Command::new("zpool"); let mut command = std::process::Command::new("zpool");
command.args([ command.args([
@ -275,12 +269,12 @@ pub fn create_zpool(
} }
} }
task_log!(worker, "# {:?}", command); info!("# {command:?}");
match proxmox_sys::command::run_command(command, None) { match proxmox_sys::command::run_command(command, None) {
Ok(output) => task_log!(worker, "{output}"), Ok(output) => info!("{output}"),
Err(err) => { Err(err) => {
task_error!(worker, "{err}"); error!("{err}");
bail!("Error during 'zpool create', see task log for more details"); bail!("Error during 'zpool create', see task log for more details");
} }
}; };
@ -299,11 +293,11 @@ pub fn create_zpool(
command.arg(&format!("compression={}", compression)); command.arg(&format!("compression={}", compression));
} }
command.args(["relatime=on", &name]); command.args(["relatime=on", &name]);
task_log!(worker, "# {:?}", command); info!("# {command:?}");
match proxmox_sys::command::run_command(command, None) { match proxmox_sys::command::run_command(command, None) {
Ok(output) => task_log!(worker, "{output}"), Ok(output) => info!("{output}"),
Err(err) => { Err(err) => {
task_error!(worker, "{err}"); error!("{err}");
bail!("Error during 'zfs set', see task log for more details"); bail!("Error during 'zfs set', see task log for more details");
} }
}; };
@ -319,12 +313,7 @@ pub fn create_zpool(
bail!("datastore '{}' already exists.", datastore.name); bail!("datastore '{}' already exists.", datastore.name);
} }
crate::api2::config::datastore::do_create_datastore( crate::api2::config::datastore::do_create_datastore(lock, config, datastore)?;
lock,
config,
datastore,
Some(&worker),
)?;
} }
Ok(()) Ok(())

View File

@ -25,6 +25,7 @@ use proxmox_sortable_macro::sortable;
use proxmox_sys::fd::fd_change_cloexec; use proxmox_sys::fd::fd_change_cloexec;
use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_CONSOLE}; use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_CONSOLE};
use tracing::{info, warn};
use crate::auth::{private_auth_keyring, public_auth_keyring}; use crate::auth::{private_auth_keyring, public_auth_keyring};
use crate::tools; use crate::tools;
@ -181,20 +182,18 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
let stdout = child.stdout.take().expect("no child stdout handle"); let stdout = child.stdout.take().expect("no child stdout handle");
let stderr = child.stderr.take().expect("no child stderr handle"); let stderr = child.stderr.take().expect("no child stderr handle");
let worker_stdout = worker.clone();
let stdout_fut = async move { let stdout_fut = async move {
let mut reader = BufReader::new(stdout).lines(); let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await? { while let Some(line) = reader.next_line().await? {
worker_stdout.log_message(line); info!(line);
} }
Ok::<(), Error>(()) Ok::<(), Error>(())
}; };
let worker_stderr = worker.clone();
let stderr_fut = async move { let stderr_fut = async move {
let mut reader = BufReader::new(stderr).lines(); let mut reader = BufReader::new(stderr).lines();
while let Some(line) = reader.next_line().await? { while let Some(line) = reader.next_line().await? {
worker_stderr.log_warning(line); warn!(line);
} }
Ok::<(), Error>(()) Ok::<(), Error>(())
}; };
@ -226,9 +225,9 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
} }
if let Err(err) = child.kill().await { if let Err(err) = child.kill().await {
worker.log_warning(format!("error killing termproxy: {}", err)); warn!("error killing termproxy: {err}");
} else if let Err(err) = child.wait().await { } else if let Err(err) = child.wait().await {
worker.log_warning(format!("error awaiting termproxy: {}", err)); warn!("error awaiting termproxy: {err}");
} }
} }

View File

@ -1,10 +1,10 @@
//! Sync datastore from remote server //! Sync datastore from remote server
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use futures::{future::FutureExt, select}; use futures::{future::FutureExt, select};
use tracing::info;
use proxmox_router::{Permission, Router, RpcEnvironment}; use proxmox_router::{Permission, Router, RpcEnvironment};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::task_log;
use pbs_api_types::{ use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
@ -128,12 +128,12 @@ pub fn do_sync_job(
let worker_future = async move { let worker_future = async move {
let pull_params = PullParameters::try_from(&sync_job)?; let pull_params = PullParameters::try_from(&sync_job)?;
task_log!(worker, "Starting datastore sync job '{}'", job_id); info!("Starting datastore sync job '{job_id}'");
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
task_log!(worker, "task triggered by schedule '{}'", event_str); info!("task triggered by schedule '{event_str}'");
} }
task_log!(
worker, info!(
"sync datastore '{}' from '{}{}'", "sync datastore '{}' from '{}{}'",
sync_job.store, sync_job.store,
sync_job sync_job
@ -143,33 +143,29 @@ pub fn do_sync_job(
sync_job.remote_store, sync_job.remote_store,
); );
let pull_stats = pull_store(&worker, pull_params).await?; let pull_stats = pull_store(pull_params).await?;
if pull_stats.bytes != 0 { if pull_stats.bytes != 0 {
let amount = HumanByte::from(pull_stats.bytes); let amount = HumanByte::from(pull_stats.bytes);
let rate = HumanByte::new_binary( let rate = HumanByte::new_binary(
pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(), pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
); );
task_log!( info!(
worker,
"Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)", "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
pull_stats.chunk_count, pull_stats.chunk_count,
); );
} else { } else {
task_log!(worker, "Summary: sync job found no new data to pull"); info!("Summary: sync job found no new data to pull");
} }
if let Some(removed) = pull_stats.removed { if let Some(removed) = pull_stats.removed {
task_log!( info!(
worker,
"Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
removed.snapshots, removed.snapshots, removed.groups, removed.namespaces,
removed.groups,
removed.namespaces,
); );
} }
task_log!(worker, "sync job '{}' end", &job_id); info!("sync job '{}' end", &job_id);
Ok(()) Ok(())
}; };
@ -315,21 +311,18 @@ async fn pull(
auth_id.to_string(), auth_id.to_string(),
true, true,
move |worker| async move { move |worker| async move {
task_log!( info!(
worker, "pull datastore '{store}' from '{}/{remote_store}'",
"pull datastore '{}' from '{}/{}'",
store,
remote.as_deref().unwrap_or("-"), remote.as_deref().unwrap_or("-"),
remote_store,
); );
let pull_future = pull_store(&worker, pull_params); let pull_future = pull_store(pull_params);
(select! { (select! {
success = pull_future.fuse() => success, success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
})?; })?;
task_log!(worker, "pull datastore '{}' end", store); info!("pull datastore '{store}' end");
Ok(()) Ok(())
}, },

View File

@ -2,11 +2,12 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::Value; use serde_json::Value;
use tracing::{info, warn};
use proxmox_lang::try_block; use proxmox_lang::try_block;
use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType}; use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_sys::{task_log, task_warn, WorkerTaskContext}; use proxmox_sys::WorkerTaskContext;
use pbs_api_types::{ use pbs_api_types::{
print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation, print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation,
@ -176,7 +177,7 @@ pub fn do_tape_backup_job(
let job_result = try_block!({ let job_result = try_block!({
if schedule.is_some() { if schedule.is_some() {
// for scheduled tape backup jobs, we wait indefinitely for the lock // for scheduled tape backup jobs, we wait indefinitely for the lock
task_log!(worker, "waiting for drive lock..."); info!("waiting for drive lock...");
loop { loop {
worker.check_abort()?; worker.check_abort()?;
match lock_tape_device(&drive_config, &setup.drive) { match lock_tape_device(&drive_config, &setup.drive) {
@ -191,9 +192,9 @@ pub fn do_tape_backup_job(
} }
set_tape_device_state(&setup.drive, &worker.upid().to_string())?; set_tape_device_state(&setup.drive, &worker.upid().to_string())?;
task_log!(worker, "Starting tape backup job '{}'", job_id); info!("Starting tape backup job '{job_id}'");
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
task_log!(worker, "task triggered by schedule '{}'", event_str); info!("task triggered by schedule '{event_str}'");
} }
backup_worker( backup_worker(
@ -369,7 +370,7 @@ fn backup_worker(
) -> Result<(), Error> { ) -> Result<(), Error> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
task_log!(worker, "update media online status"); info!("update media online status");
let changer_name = update_media_online_status(&setup.drive)?; let changer_name = update_media_online_status(&setup.drive)?;
let root_namespace = setup.ns.clone().unwrap_or_default(); let root_namespace = setup.ns.clone().unwrap_or_default();
@ -381,7 +382,6 @@ fn backup_worker(
let mut pool_writer = PoolWriter::new( let mut pool_writer = PoolWriter::new(
pool, pool,
&setup.drive, &setup.drive,
worker,
notification_mode, notification_mode,
force_media_set, force_media_set,
ns_magic, ns_magic,
@ -405,11 +405,9 @@ fn backup_worker(
None => group_list, None => group_list,
}; };
task_log!( info!(
worker, "found {} groups (out of {group_count_full} total)",
"found {} groups (out of {} total)", group_list.len()
group_list.len(),
group_count_full
); );
let mut progress = StoreProgress::new(group_list.len() as u64); let mut progress = StoreProgress::new(group_list.len() as u64);
@ -417,10 +415,7 @@ fn backup_worker(
let latest_only = setup.latest_only.unwrap_or(false); let latest_only = setup.latest_only.unwrap_or(false);
if latest_only { if latest_only {
task_log!( info!("latest-only: true (only considering latest snapshots)");
worker,
"latest-only: true (only considering latest snapshots)"
);
} }
let datastore_name = datastore.name(); let datastore_name = datastore.name();
@ -443,8 +438,7 @@ fn backup_worker(
.collect(); .collect();
if snapshot_list.is_empty() { if snapshot_list.is_empty() {
task_log!( info!(
worker,
"{}, group {} was empty", "{}, group {} was empty",
print_store_and_ns(datastore_name, group.backup_ns()), print_store_and_ns(datastore_name, group.backup_ns()),
group.group() group.group()
@ -464,7 +458,7 @@ fn backup_worker(
info.backup_dir.backup_ns(), info.backup_dir.backup_ns(),
info.backup_dir.as_ref(), info.backup_dir.as_ref(),
) { ) {
task_log!(worker, "skip snapshot {}", rel_path); info!("skip snapshot {rel_path}");
continue; continue;
} }
@ -477,7 +471,7 @@ fn backup_worker(
SnapshotBackupResult::Ignored => {} SnapshotBackupResult::Ignored => {}
} }
progress.done_snapshots = 1; progress.done_snapshots = 1;
task_log!(worker, "percentage done: {}", progress); info!("percentage done: {progress}");
} }
} else { } else {
progress.group_snapshots = snapshot_list.len() as u64; progress.group_snapshots = snapshot_list.len() as u64;
@ -490,7 +484,7 @@ fn backup_worker(
info.backup_dir.backup_ns(), info.backup_dir.backup_ns(),
info.backup_dir.as_ref(), info.backup_dir.as_ref(),
) { ) {
task_log!(worker, "skip snapshot {}", rel_path); info!("skip snapshot {rel_path}");
continue; continue;
} }
@ -503,7 +497,7 @@ fn backup_worker(
SnapshotBackupResult::Ignored => {} SnapshotBackupResult::Ignored => {}
} }
progress.done_snapshots = snapshot_number as u64 + 1; progress.done_snapshots = snapshot_number as u64 + 1;
task_log!(worker, "percentage done: {}", progress); info!("percentage done: {progress}");
} }
} }
} }
@ -511,18 +505,15 @@ fn backup_worker(
pool_writer.commit()?; pool_writer.commit()?;
if need_catalog { if need_catalog {
task_log!(worker, "append media catalog"); info!("append media catalog");
let uuid = pool_writer.load_writable_media(worker)?; let uuid = pool_writer.load_writable_media(worker)?;
let done = pool_writer.append_catalog_archive(worker)?; let done = pool_writer.append_catalog_archive()?;
if !done { if !done {
task_log!( info!("catalog does not fit on tape, writing to next volume");
worker,
"catalog does not fit on tape, writing to next volume"
);
pool_writer.set_media_status_full(&uuid)?; pool_writer.set_media_status_full(&uuid)?;
pool_writer.load_writable_media(worker)?; pool_writer.load_writable_media(worker)?;
let done = pool_writer.append_catalog_archive(worker)?; let done = pool_writer.append_catalog_archive()?;
if !done { if !done {
bail!("write_catalog_archive failed on second media"); bail!("write_catalog_archive failed on second media");
} }
@ -530,9 +521,9 @@ fn backup_worker(
} }
if setup.export_media_set.unwrap_or(false) { if setup.export_media_set.unwrap_or(false) {
pool_writer.export_media_set(worker)?; pool_writer.export_media_set()?;
} else if setup.eject_media.unwrap_or(false) { } else if setup.eject_media.unwrap_or(false) {
pool_writer.eject_media(worker)?; pool_writer.eject_media()?;
} }
if errors { if errors {
@ -542,7 +533,7 @@ fn backup_worker(
summary.used_tapes = match pool_writer.get_used_media_labels() { summary.used_tapes = match pool_writer.get_used_media_labels() {
Ok(tapes) => Some(tapes), Ok(tapes) => Some(tapes),
Err(err) => { Err(err) => {
task_warn!(worker, "could not collect list of used tapes: {err}"); warn!("could not collect list of used tapes: {err}");
None None
} }
}; };
@ -576,7 +567,7 @@ fn backup_snapshot(
snapshot: BackupDir, snapshot: BackupDir,
) -> Result<SnapshotBackupResult, Error> { ) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path(); let snapshot_path = snapshot.relative_path();
task_log!(worker, "backup snapshot {:?}", snapshot_path); info!("backup snapshot {snapshot_path:?}");
let snapshot_reader = match snapshot.locked_reader() { let snapshot_reader = match snapshot.locked_reader() {
Ok(reader) => reader, Ok(reader) => reader,
@ -584,15 +575,10 @@ fn backup_snapshot(
if !snapshot.full_path().exists() { if !snapshot.full_path().exists() {
// we got an error and the dir does not exist, // we got an error and the dir does not exist,
// it probably just vanished, so continue // it probably just vanished, so continue
task_log!(worker, "snapshot {:?} vanished, skipping", snapshot_path); info!("snapshot {snapshot_path:?} vanished, skipping");
return Ok(SnapshotBackupResult::Ignored); return Ok(SnapshotBackupResult::Ignored);
} }
task_warn!( warn!("failed opening snapshot {snapshot_path:?}: {err}");
worker,
"failed opening snapshot {:?}: {}",
snapshot_path,
err
);
return Ok(SnapshotBackupResult::Error); return Ok(SnapshotBackupResult::Error);
} }
}; };
@ -638,7 +624,7 @@ fn backup_snapshot(
let snapshot_reader = snapshot_reader.lock().unwrap(); let snapshot_reader = snapshot_reader.lock().unwrap();
let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?; let (done, _bytes) = pool_writer.append_snapshot_archive(&snapshot_reader)?;
if !done { if !done {
// does not fit on tape, so we try on next volume // does not fit on tape, so we try on next volume
@ -647,19 +633,14 @@ fn backup_snapshot(
worker.check_abort()?; worker.check_abort()?;
pool_writer.load_writable_media(worker)?; pool_writer.load_writable_media(worker)?;
let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?; let (done, _bytes) = pool_writer.append_snapshot_archive(&snapshot_reader)?;
if !done { if !done {
bail!("write_snapshot_archive failed on second media"); bail!("write_snapshot_archive failed on second media");
} }
} }
task_log!( info!("end backup {}:{snapshot_path:?}", datastore.name());
worker,
"end backup {}:{:?}",
datastore.name(),
snapshot_path
);
Ok(SnapshotBackupResult::Success) Ok(SnapshotBackupResult::Success)
} }

View File

@ -5,6 +5,7 @@ use std::sync::Arc;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use pbs_tape::sg_tape::SgTape; use pbs_tape::sg_tape::SgTape;
use serde_json::Value; use serde_json::Value;
use tracing::{info, warn};
use proxmox_router::{ use proxmox_router::{
list_subdirs_api_method, Permission, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap, list_subdirs_api_method, Permission, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
@ -12,7 +13,6 @@ use proxmox_router::{
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_section_config::SectionConfigData; use proxmox_section_config::SectionConfigData;
use proxmox_sortable_macro::sortable; use proxmox_sortable_macro::sortable;
use proxmox_sys::{task_log, task_warn};
use proxmox_uuid::Uuid; use proxmox_uuid::Uuid;
use pbs_api_types::{ use pbs_api_types::{
@ -131,13 +131,8 @@ pub fn load_media(
drive.clone(), drive.clone(),
"load-media", "load-media",
Some(job_id), Some(job_id),
move |worker, config| { move |_worker, config| {
task_log!( info!("loading media '{label_text}' into drive '{drive}'");
worker,
"loading media '{}' into drive '{}'",
label_text,
drive
);
let (mut changer, _) = required_media_changer(&config, &drive)?; let (mut changer, _) = required_media_changer(&config, &drive)?;
changer.load_media(&label_text)?; changer.load_media(&label_text)?;
Ok(()) Ok(())
@ -250,8 +245,8 @@ pub fn unload(
drive.clone(), drive.clone(),
"unload-media", "unload-media",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| { move |_worker, config| {
task_log!(worker, "unloading media from drive '{}'", drive); info!("unloading media from drive '{drive}'");
let (mut changer, _) = required_media_changer(&config, &drive)?; let (mut changer, _) = required_media_changer(&config, &drive)?;
changer.unload_media(target_slot)?; changer.unload_media(target_slot)?;
@ -299,9 +294,9 @@ pub fn format_media(
drive.clone(), drive.clone(),
"format-media", "format-media",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| { move |_worker, config| {
if let Some(ref label) = label_text { if let Some(ref label) = label_text {
task_log!(worker, "try to load media '{}'", label); info!("try to load media '{label}'");
if let Some((mut changer, _)) = media_changer(&config, &drive)? { if let Some((mut changer, _)) = media_changer(&config, &drive)? {
changer.load_media(label)?; changer.load_media(label)?;
} }
@ -315,11 +310,8 @@ pub fn format_media(
let mut handle = LtoTapeHandle::new(file)?; let mut handle = LtoTapeHandle::new(file)?;
if let Ok(status) = handle.get_drive_and_media_status() { if let Ok(status) = handle.get_drive_and_media_status() {
if status.density >= TapeDensity::LTO9 { if status.density >= TapeDensity::LTO9 {
task_log!(worker, "Slow formatting LTO9+ media."); info!("Slow formatting LTO9+ media.");
task_log!( info!("This can take a very long time due to media optimization.");
worker,
"This can take a very long time due to media optimization."
);
} }
} }
} }
@ -330,15 +322,15 @@ pub fn format_media(
bail!("expected label '{}', found unrelated data", label); bail!("expected label '{}', found unrelated data", label);
} }
/* assume drive contains no or unrelated data */ /* assume drive contains no or unrelated data */
task_log!(worker, "unable to read media label: {}", err); info!("unable to read media label: {err}");
task_log!(worker, "format anyways"); info!("format anyways");
handle.format_media(fast.unwrap_or(true))?; handle.format_media(fast.unwrap_or(true))?;
} }
Ok((None, _)) => { Ok((None, _)) => {
if let Some(label) = label_text { if let Some(label) = label_text {
bail!("expected label '{}', found empty tape", label); bail!("expected label '{}', found empty tape", label);
} }
task_log!(worker, "found empty media - format anyways"); info!("found empty media - format anyways");
handle.format_media(fast.unwrap_or(true))?; handle.format_media(fast.unwrap_or(true))?;
} }
Ok((Some(media_id), _key_config)) => { Ok((Some(media_id), _key_config)) => {
@ -352,11 +344,9 @@ pub fn format_media(
} }
} }
task_log!( info!(
worker,
"found media '{}' with uuid '{}'", "found media '{}' with uuid '{}'",
media_id.label.label_text, media_id.label.label_text, media_id.label.uuid,
media_id.label.uuid,
); );
let mut inventory = Inventory::new(TAPE_STATUS_DIR); let mut inventory = Inventory::new(TAPE_STATUS_DIR);
@ -504,7 +494,7 @@ pub fn label_media(
drive.clone(), drive.clone(),
"label-media", "label-media",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| { move |_worker, config| {
let mut drive = open_drive(&config, &drive)?; let mut drive = open_drive(&config, &drive)?;
drive.rewind()?; drive.rewind()?;
@ -526,7 +516,7 @@ pub fn label_media(
pool: pool.clone(), pool: pool.clone(),
}; };
write_media_label(worker, &mut drive, label, pool) write_media_label(&mut drive, label, pool)
}, },
)?; )?;
@ -534,7 +524,6 @@ pub fn label_media(
} }
fn write_media_label( fn write_media_label(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>, drive: &mut Box<dyn TapeDriver>,
label: MediaLabel, label: MediaLabel,
pool: Option<String>, pool: Option<String>,
@ -549,18 +538,9 @@ fn write_media_label(
} }
drive.label_tape(&label)?; drive.label_tape(&label)?;
if let Some(ref pool) = pool { if let Some(ref pool) = pool {
task_log!( info!("Label media '{}' for pool '{pool}'", label.label_text);
worker,
"Label media '{}' for pool '{}'",
label.label_text,
pool
);
} else { } else {
task_log!( info!("Label media '{}' (no pool assignment)", label.label_text);
worker,
"Label media '{}' (no pool assignment)",
label.label_text
);
} }
let media_id = MediaId { let media_id = MediaId {
@ -749,10 +729,10 @@ pub fn clean_drive(drive: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Val
drive.clone(), drive.clone(),
"clean-drive", "clean-drive",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| { move |_worker, config| {
let (mut changer, _changer_name) = required_media_changer(&config, &drive)?; let (mut changer, _changer_name) = required_media_changer(&config, &drive)?;
task_log!(worker, "Starting drive clean"); info!("Starting drive clean");
changer.clean_drive()?; changer.clean_drive()?;
@ -763,7 +743,7 @@ pub fn clean_drive(drive: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Val
// test for critical tape alert flags // test for critical tape alert flags
if let Ok(alert_flags) = handle.tape_alert_flags() { if let Ok(alert_flags) = handle.tape_alert_flags() {
if !alert_flags.is_empty() { if !alert_flags.is_empty() {
task_log!(worker, "TapeAlertFlags: {:?}", alert_flags); info!("TapeAlertFlags: {alert_flags:?}");
if tape_alert_flags_critical(alert_flags) { if tape_alert_flags_critical(alert_flags) {
bail!("found critical tape alert flags: {:?}", alert_flags); bail!("found critical tape alert flags: {:?}", alert_flags);
} }
@ -772,13 +752,13 @@ pub fn clean_drive(drive: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Val
// test wearout (max. 50 mounts) // test wearout (max. 50 mounts)
if let Ok(volume_stats) = handle.volume_statistics() { if let Ok(volume_stats) = handle.volume_statistics() {
task_log!(worker, "Volume mounts: {}", volume_stats.volume_mounts); info!("Volume mounts: {}", volume_stats.volume_mounts);
let wearout = volume_stats.volume_mounts * 2; // (*100.0/50.0); let wearout = volume_stats.volume_mounts * 2; // (*100.0/50.0);
task_log!(worker, "Cleaning tape wearout: {}%", wearout); info!("Cleaning tape wearout: {wearout}%");
} }
} }
task_log!(worker, "Drive cleaned successfully"); info!("Drive cleaned successfully");
Ok(()) Ok(())
}, },
@ -910,12 +890,12 @@ pub fn update_inventory(
drive.clone(), drive.clone(),
"inventory-update", "inventory-update",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| { move |_worker, config| {
let (mut changer, changer_name) = required_media_changer(&config, &drive)?; let (mut changer, changer_name) = required_media_changer(&config, &drive)?;
let label_text_list = changer.online_media_label_texts()?; let label_text_list = changer.online_media_label_texts()?;
if label_text_list.is_empty() { if label_text_list.is_empty() {
task_log!(worker, "changer device does not list any media labels"); info!("changer device does not list any media labels");
} }
let mut inventory = Inventory::load(TAPE_STATUS_DIR)?; let mut inventory = Inventory::load(TAPE_STATUS_DIR)?;
@ -924,7 +904,7 @@ pub fn update_inventory(
for label_text in label_text_list.iter() { for label_text in label_text_list.iter() {
if label_text.starts_with("CLN") { if label_text.starts_with("CLN") {
task_log!(worker, "skip cleaning unit '{}'", label_text); info!("skip cleaning unit '{label_text}'");
continue; continue;
} }
@ -936,12 +916,12 @@ pub fn update_inventory(
if !catalog if !catalog
|| MediaCatalog::exists(TAPE_STATUS_DIR, &media_id.label.uuid) || MediaCatalog::exists(TAPE_STATUS_DIR, &media_id.label.uuid)
{ {
task_log!(worker, "media '{}' already inventoried", label_text); info!("media '{label_text}' already inventoried");
continue; continue;
} }
} }
Err(err) => { Err(err) => {
task_warn!(worker, "error getting media by unique label: {err}"); warn!("error getting media by unique label: {err}");
// we can't be sure which uuid it is // we can't be sure which uuid it is
continue; continue;
} }
@ -950,37 +930,28 @@ pub fn update_inventory(
} }
if let Err(err) = changer.load_media(&label_text) { if let Err(err) = changer.load_media(&label_text) {
task_warn!(worker, "unable to load media '{}' - {}", label_text, err); warn!("unable to load media '{label_text}' - {err}");
continue; continue;
} }
let mut drive = open_drive(&config, &drive)?; let mut drive = open_drive(&config, &drive)?;
match drive.read_label() { match drive.read_label() {
Err(err) => { Err(err) => {
task_warn!( warn!("unable to read label form media '{label_text}' - {err}");
worker,
"unable to read label form media '{}' - {}",
label_text,
err
);
} }
Ok((None, _)) => { Ok((None, _)) => {
task_log!(worker, "media '{}' is empty", label_text); info!("media '{label_text}' is empty");
} }
Ok((Some(media_id), _key_config)) => { Ok((Some(media_id), _key_config)) => {
if label_text != media_id.label.label_text { if label_text != media_id.label.label_text {
task_warn!( warn!(
worker, "label text mismatch ({label_text} != {})",
"label text mismatch ({} != {})",
label_text,
media_id.label.label_text media_id.label.label_text
); );
continue; continue;
} }
task_log!( info!(
worker, "inventorize media '{label_text}' with uuid '{}'",
"inventorize media '{}' with uuid '{}'",
label_text,
media_id.label.uuid media_id.label.uuid
); );
@ -1002,15 +973,11 @@ pub fn update_inventory(
if catalog { if catalog {
let media_set = inventory.compute_media_set_members(&set.uuid)?; let media_set = inventory.compute_media_set_members(&set.uuid)?;
if let Err(err) = fast_catalog_restore( if let Err(err) = fast_catalog_restore(
&worker,
&mut drive, &mut drive,
&media_set, &media_set,
&media_id.label.uuid, &media_id.label.uuid,
) { ) {
task_warn!( warn!("could not restore catalog for {label_text}: {err}");
worker,
"could not restore catalog for {label_text}: {err}"
);
} }
} }
} else { } else {
@ -1066,14 +1033,13 @@ pub fn barcode_label_media(
drive.clone(), drive.clone(),
"barcode-label-media", "barcode-label-media",
Some(drive.clone()), Some(drive.clone()),
move |worker, config| barcode_label_media_worker(worker, drive, &config, pool), move |_worker, config| barcode_label_media_worker(drive, &config, pool),
)?; )?;
Ok(upid_str.into()) Ok(upid_str.into())
} }
fn barcode_label_media_worker( fn barcode_label_media_worker(
worker: Arc<WorkerTask>,
drive: String, drive: String,
drive_config: &SectionConfigData, drive_config: &SectionConfigData,
pool: Option<String>, pool: Option<String>,
@ -1106,24 +1072,20 @@ fn barcode_label_media_worker(
inventory.reload()?; inventory.reload()?;
match inventory.find_media_by_label_text(&label_text) { match inventory.find_media_by_label_text(&label_text) {
Ok(Some(_)) => { Ok(Some(_)) => {
task_log!( info!("media '{label_text}' already inventoried (already labeled)");
worker,
"media '{}' already inventoried (already labeled)",
label_text
);
continue; continue;
} }
Err(err) => { Err(err) => {
task_warn!(worker, "error getting media by unique label: {err}",); warn!("error getting media by unique label: {err}",);
continue; continue;
} }
Ok(None) => {} // ok to label Ok(None) => {} // ok to label
} }
task_log!(worker, "checking/loading media '{}'", label_text); info!("checking/loading media '{label_text}'");
if let Err(err) = changer.load_media(&label_text) { if let Err(err) = changer.load_media(&label_text) {
task_warn!(worker, "unable to load media '{}' - {}", label_text, err); warn!("unable to load media '{label_text}' - {err}");
continue; continue;
} }
@ -1132,21 +1094,13 @@ fn barcode_label_media_worker(
match drive.read_next_file() { match drive.read_next_file() {
Ok(_reader) => { Ok(_reader) => {
task_log!( info!("media '{label_text}' is not empty (format it first)");
worker,
"media '{}' is not empty (format it first)",
label_text
);
continue; continue;
} }
Err(BlockReadError::EndOfFile) => { /* EOF mark at BOT, assume tape is empty */ } Err(BlockReadError::EndOfFile) => { /* EOF mark at BOT, assume tape is empty */ }
Err(BlockReadError::EndOfStream) => { /* tape is empty */ } Err(BlockReadError::EndOfStream) => { /* tape is empty */ }
Err(_err) => { Err(_err) => {
task_warn!( warn!("media '{label_text}' read error (maybe not empty - format it first)");
worker,
"media '{}' read error (maybe not empty - format it first)",
label_text
);
continue; continue;
} }
} }
@ -1159,7 +1113,7 @@ fn barcode_label_media_worker(
pool: pool.clone(), pool: pool.clone(),
}; };
write_media_label(worker.clone(), &mut drive, label, pool.clone())? write_media_label(&mut drive, label, pool.clone())?
} }
Ok(()) Ok(())
@ -1318,14 +1272,12 @@ pub fn catalog_media(
let media_id = match drive.read_label()? { let media_id = match drive.read_label()? {
(Some(media_id), key_config) => { (Some(media_id), key_config) => {
task_log!( info!(
worker,
"found media label: {}", "found media label: {}",
serde_json::to_string_pretty(&serde_json::to_value(&media_id)?)? serde_json::to_string_pretty(&serde_json::to_value(&media_id)?)?
); );
if key_config.is_some() { if key_config.is_some() {
task_log!( info!(
worker,
"encryption key config: {}", "encryption key config: {}",
serde_json::to_string_pretty(&serde_json::to_value(&key_config)?)? serde_json::to_string_pretty(&serde_json::to_value(&key_config)?)?
); );
@ -1339,7 +1291,7 @@ pub fn catalog_media(
let (_media_set_lock, media_set_uuid) = match media_id.media_set_label { let (_media_set_lock, media_set_uuid) = match media_id.media_set_label {
None => { None => {
task_log!(worker, "media is empty"); info!("media is empty");
let _pool_lock = if let Some(pool) = media_id.pool() { let _pool_lock = if let Some(pool) = media_id.pool() {
lock_media_pool(TAPE_STATUS_DIR, &pool)? lock_media_pool(TAPE_STATUS_DIR, &pool)?
} else { } else {
@ -1352,7 +1304,7 @@ pub fn catalog_media(
Some(ref set) => { Some(ref set) => {
if set.unassigned() { if set.unassigned() {
// media is empty // media is empty
task_log!(worker, "media is empty"); info!("media is empty");
let _lock = lock_unassigned_media_pool(TAPE_STATUS_DIR)?; let _lock = lock_unassigned_media_pool(TAPE_STATUS_DIR)?;
MediaCatalog::destroy(TAPE_STATUS_DIR, &media_id.label.uuid)?; MediaCatalog::destroy(TAPE_STATUS_DIR, &media_id.label.uuid)?;
inventory.store(media_id.clone(), false)?; inventory.store(media_id.clone(), false)?;
@ -1377,14 +1329,14 @@ pub fn catalog_media(
if !scan { if !scan {
let media_set = inventory.compute_media_set_members(media_set_uuid)?; let media_set = inventory.compute_media_set_members(media_set_uuid)?;
if fast_catalog_restore(&worker, &mut drive, &media_set, &media_id.label.uuid)? { if fast_catalog_restore(&mut drive, &media_set, &media_id.label.uuid)? {
return Ok(()); return Ok(());
} }
task_log!(worker, "no catalog found"); info!("no catalog found");
} }
task_log!(worker, "scanning entire media to reconstruct catalog"); info!("scanning entire media to reconstruct catalog");
drive.rewind()?; drive.rewind()?;
drive.read_label()?; // skip over labels - we already read them above drive.read_label()?; // skip over labels - we already read them above

View File

@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::Value; use serde_json::Value;
use tracing::{info, warn};
use proxmox_human_byte::HumanByte; use proxmox_human_byte::HumanByte;
use proxmox_io::ReadExt; use proxmox_io::ReadExt;
@ -13,7 +14,7 @@ use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::{api, ApiType}; use proxmox_schema::{api, ApiType};
use proxmox_section_config::SectionConfigData; use proxmox_section_config::SectionConfigData;
use proxmox_sys::fs::{replace_file, CreateOptions}; use proxmox_sys::fs::{replace_file, CreateOptions};
use proxmox_sys::{task_log, task_warn, WorkerTaskContext}; use proxmox_sys::WorkerTaskContext;
use proxmox_uuid::Uuid; use proxmox_uuid::Uuid;
use pbs_api_types::{ use pbs_api_types::{
@ -403,12 +404,12 @@ pub fn restore(
let restore_owner = owner.as_ref().unwrap_or(&auth_id); let restore_owner = owner.as_ref().unwrap_or(&auth_id);
task_log!(worker, "Mediaset '{media_set}'"); info!("Mediaset '{media_set}'");
task_log!(worker, "Pool: {pool}"); info!("Pool: {pool}");
let res = if snapshots.is_some() || namespaces { let res = if snapshots.is_some() || namespaces {
restore_list_worker( restore_list_worker(
worker.clone(), worker,
snapshots.unwrap_or_default(), snapshots.unwrap_or_default(),
inventory, inventory,
media_set_uuid, media_set_uuid,
@ -422,7 +423,7 @@ pub fn restore(
) )
} else { } else {
restore_full_worker( restore_full_worker(
worker.clone(), worker,
inventory, inventory,
media_set_uuid, media_set_uuid,
drive_config, drive_config,
@ -434,10 +435,10 @@ pub fn restore(
) )
}; };
if res.is_ok() { if res.is_ok() {
task_log!(worker, "Restore mediaset '{media_set}' done"); info!("Restore mediaset '{media_set}' done");
} }
if let Err(err) = set_tape_device_state(&drive, "") { if let Err(err) = set_tape_device_state(&drive, "") {
task_log!(worker, "could not unset drive state for {drive}: {err}"); info!("could not unset drive state for {drive}: {err}");
} }
res res
@ -488,7 +489,7 @@ fn restore_full_worker(
} }
if let Some(fingerprint) = encryption_key_fingerprint { if let Some(fingerprint) = encryption_key_fingerprint {
task_log!(worker, "Encryption key fingerprint: {fingerprint}"); info!("Encryption key fingerprint: {fingerprint}");
} }
let used_datastores = store_map.used_datastores(); let used_datastores = store_map.used_datastores();
@ -497,13 +498,9 @@ fn restore_full_worker(
.map(|(t, _)| String::from(t.name())) .map(|(t, _)| String::from(t.name()))
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(", "); .join(", ");
task_log!(worker, "Datastore(s): {datastore_list}",); info!("Datastore(s): {datastore_list}",);
task_log!(worker, "Drive: {drive_name}"); info!("Drive: {drive_name}");
log_required_tapes( log_required_tapes(&inventory, media_id_list.iter().map(|id| &id.label.uuid));
&worker,
&inventory,
media_id_list.iter().map(|id| &id.label.uuid),
);
let mut datastore_locks = Vec::new(); let mut datastore_locks = Vec::new();
for (target, _) in used_datastores.values() { for (target, _) in used_datastores.values() {
@ -533,7 +530,6 @@ fn restore_full_worker(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn check_snapshot_restorable( fn check_snapshot_restorable(
worker: &WorkerTask,
store_map: &DataStoreMap, store_map: &DataStoreMap,
store: &str, store: &str,
snapshot: &str, snapshot: &str,
@ -574,7 +570,7 @@ fn check_snapshot_restorable(
auth_id, auth_id,
Some(restore_owner), Some(restore_owner),
) { ) {
task_warn!(worker, "cannot restore {store}:{snapshot} to {ns}: '{err}'"); warn!("cannot restore {store}:{snapshot} to {ns}: '{err}'");
continue; continue;
} }
@ -582,8 +578,7 @@ fn check_snapshot_restorable(
if let Ok(owner) = datastore.get_owner(&ns, dir.as_ref()) { if let Ok(owner) = datastore.get_owner(&ns, dir.as_ref()) {
if restore_owner != &owner { if restore_owner != &owner {
// only the owner is allowed to create additional snapshots // only the owner is allowed to create additional snapshots
task_warn!( warn!(
worker,
"restore of '{snapshot}' to {ns} failed, owner check failed ({restore_owner} \ "restore of '{snapshot}' to {ns} failed, owner check failed ({restore_owner} \
!= {owner})", != {owner})",
); );
@ -594,10 +589,7 @@ fn check_snapshot_restorable(
have_some_permissions = true; have_some_permissions = true;
if datastore.snapshot_path(&ns, dir).exists() { if datastore.snapshot_path(&ns, dir).exists() {
task_warn!( warn!("found snapshot {snapshot} on target datastore/namespace, skipping...",);
worker,
"found snapshot {snapshot} on target datastore/namespace, skipping...",
);
continue; continue;
} }
can_restore_some = true; can_restore_some = true;
@ -610,11 +602,7 @@ fn check_snapshot_restorable(
Ok(can_restore_some) Ok(can_restore_some)
} }
fn log_required_tapes<'a>( fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a Uuid>) {
worker: &WorkerTask,
inventory: &Inventory,
list: impl Iterator<Item = &'a Uuid>,
) {
let mut tape_list = list let mut tape_list = list
.map(|uuid| { .map(|uuid| {
inventory inventory
@ -626,7 +614,7 @@ fn log_required_tapes<'a>(
}) })
.collect::<Vec<&str>>(); .collect::<Vec<&str>>();
tape_list.sort_unstable(); tape_list.sort_unstable();
task_log!(worker, "Required media list: {}", tape_list.join(";")); info!("Required media list: {}", tape_list.join(";"));
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -658,14 +646,13 @@ fn restore_list_worker(
let (ns, dir) = match parse_ns_and_snapshot(snapshot) { let (ns, dir) = match parse_ns_and_snapshot(snapshot) {
Ok((ns, dir)) if store_map.has_full_mapping(store, &ns) => (ns, dir), Ok((ns, dir)) if store_map.has_full_mapping(store, &ns) => (ns, dir),
Err(err) => { Err(err) => {
task_warn!(worker, "couldn't parse snapshot {snapshot} - {err}"); warn!("couldn't parse snapshot {snapshot} - {err}");
continue; continue;
} }
_ => continue, _ => continue,
}; };
let snapshot = print_ns_and_snapshot(&ns, &dir); let snapshot = print_ns_and_snapshot(&ns, &dir);
match check_snapshot_restorable( match check_snapshot_restorable(
&worker,
&store_map, &store_map,
store, store,
&snapshot, &snapshot,
@ -679,7 +666,7 @@ fn restore_list_worker(
Ok(true) => restorable.push((store.to_string(), snapshot.to_string(), ns, dir)), Ok(true) => restorable.push((store.to_string(), snapshot.to_string(), ns, dir)),
Ok(false) => {} Ok(false) => {}
Err(err) => { Err(err) => {
task_warn!(worker, "{err}"); warn!("{err}");
skipped.push(format!("{store}:{snapshot}")); skipped.push(format!("{store}:{snapshot}"));
} }
} }
@ -697,7 +684,6 @@ fn restore_list_worker(
match parse_ns_and_snapshot(snapshot) { match parse_ns_and_snapshot(snapshot) {
Ok((ns, dir)) => { Ok((ns, dir)) => {
match check_snapshot_restorable( match check_snapshot_restorable(
&worker,
&store_map, &store_map,
store, store,
snapshot, snapshot,
@ -713,14 +699,14 @@ fn restore_list_worker(
} }
Ok(false) => None, Ok(false) => None,
Err(err) => { Err(err) => {
task_warn!(worker, "{err}"); warn!("{err}");
skipped.push(format!("{store}:{snapshot}")); skipped.push(format!("{store}:{snapshot}"));
None None
} }
} }
} }
Err(err) => { Err(err) => {
task_warn!(worker, "could not restore {store_snapshot}: {err}"); warn!("could not restore {store_snapshot}: {err}");
skipped.push(store_snapshot); skipped.push(store_snapshot);
None None
} }
@ -738,10 +724,7 @@ fn restore_list_worker(
let media_id = inventory.lookup_media(media_uuid).unwrap(); let media_id = inventory.lookup_media(media_uuid).unwrap();
(media_id, file_num) (media_id, file_num)
} else { } else {
task_warn!( warn!("did not find snapshot '{store}:{snapshot}' in media set",);
worker,
"did not find snapshot '{store}:{snapshot}' in media set",
);
skipped.push(format!("{store}:{snapshot}")); skipped.push(format!("{store}:{snapshot}"));
continue; continue;
}; };
@ -754,26 +737,25 @@ fn restore_list_worker(
.or_default(); .or_default();
file_list.push(file_num); file_list.push(file_num);
task_log!( info!(
worker,
"found snapshot {snapshot} on {}: file {file_num}", "found snapshot {snapshot} on {}: file {file_num}",
media_id.label.label_text, media_id.label.label_text,
); );
} }
if snapshot_file_hash.is_empty() { if snapshot_file_hash.is_empty() {
task_log!(worker, "nothing to restore, skipping remaining phases..."); info!("nothing to restore, skipping remaining phases...");
if !skipped.is_empty() { if !skipped.is_empty() {
task_log!(worker, "skipped the following snapshots:"); info!("skipped the following snapshots:");
for snap in skipped { for snap in skipped {
task_log!(worker, " {snap}"); info!(" {snap}");
} }
} }
return Ok(()); return Ok(());
} }
task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir"); info!("Phase 1: temporarily restore snapshots to temp dir");
log_required_tapes(&worker, &inventory, snapshot_file_hash.keys()); log_required_tapes(&inventory, snapshot_file_hash.keys());
let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new(); let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
let mut tmp_paths = Vec::new(); let mut tmp_paths = Vec::new();
for (media_uuid, file_list) in snapshot_file_hash.iter_mut() { for (media_uuid, file_list) in snapshot_file_hash.iter_mut() {
@ -824,10 +806,10 @@ fn restore_list_worker(
drop(catalog); drop(catalog);
if !media_file_chunk_map.is_empty() { if !media_file_chunk_map.is_empty() {
task_log!(worker, "Phase 2: restore chunks to datastores"); info!("Phase 2: restore chunks to datastores");
log_required_tapes(&worker, &inventory, media_file_chunk_map.keys()); log_required_tapes(&inventory, media_file_chunk_map.keys());
} else { } else {
task_log!(worker, "All chunks are already present, skip phase 2..."); info!("All chunks are already present, skip phase 2...");
} }
for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() { for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() {
@ -842,10 +824,7 @@ fn restore_list_worker(
restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?; restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?;
} }
task_log!( info!("Phase 3: copy snapshots from temp dir to datastores");
worker,
"Phase 3: copy snapshots from temp dir to datastores"
);
let mut errors = false; let mut errors = false;
for (source_datastore, snapshot, source_ns, backup_dir) in snapshots.into_iter() { for (source_datastore, snapshot, source_ns, backup_dir) in snapshots.into_iter() {
if let Err(err) = proxmox_lang::try_block!({ if let Err(err) = proxmox_lang::try_block!({
@ -902,20 +881,14 @@ fn restore_list_worker(
Ok(()) Ok(())
}) { }) {
task_warn!( warn!("could not restore {source_datastore}:{snapshot}: '{err}'");
worker,
"could not restore {source_datastore}:{snapshot}: '{err}'"
);
skipped.push(format!("{source_datastore}:{snapshot}")); skipped.push(format!("{source_datastore}:{snapshot}"));
} }
} }
task_log!(worker, "Restore snapshot '{}' done", snapshot); info!("Restore snapshot '{snapshot}' done");
Ok::<_, Error>(()) Ok::<_, Error>(())
}) { }) {
task_warn!( warn!("could not copy {source_datastore}:{snapshot}: {err}");
worker,
"could not copy {source_datastore}:{snapshot}: {err}"
);
errors = true; errors = true;
} }
} }
@ -925,7 +898,7 @@ fn restore_list_worker(
std::fs::remove_dir_all(&tmp_path) std::fs::remove_dir_all(&tmp_path)
.map_err(|err| format_err!("remove_dir_all failed - {err}")) .map_err(|err| format_err!("remove_dir_all failed - {err}"))
}) { }) {
task_warn!(worker, "could not clean up temp dir {tmp_path:?}: {err}"); warn!("could not clean up temp dir {tmp_path:?}: {err}");
errors = true; errors = true;
}; };
} }
@ -934,19 +907,16 @@ fn restore_list_worker(
bail!("errors during copy occurred"); bail!("errors during copy occurred");
} }
if !skipped.is_empty() { if !skipped.is_empty() {
task_log!(worker, "(partially) skipped the following snapshots:"); info!("(partially) skipped the following snapshots:");
for snap in skipped { for snap in skipped {
task_log!(worker, " {snap}"); info!(" {snap}");
} }
} }
Ok(()) Ok(())
}); });
if res.is_err() { if res.is_err() {
task_warn!( warn!("Error during restore, partially restored snapshots will NOT be cleaned up");
worker,
"Error during restore, partially restored snapshots will NOT be cleaned up"
);
} }
for (datastore, _) in store_map.used_datastores().values() { for (datastore, _) in store_map.used_datastores().values() {
@ -954,7 +924,7 @@ fn restore_list_worker(
match std::fs::remove_dir_all(tmp_path) { match std::fs::remove_dir_all(tmp_path) {
Ok(()) => {} Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => task_warn!(worker, "error cleaning up: {}", err), Err(err) => warn!("error cleaning up: {err}"),
} }
} }
@ -1037,13 +1007,10 @@ fn restore_snapshots_to_tmpdir(
for file_num in file_list { for file_num in file_list {
let current_file_number = drive.current_file_number()?; let current_file_number = drive.current_file_number()?;
if current_file_number != *file_num { if current_file_number != *file_num {
task_log!( info!("was at file {current_file_number}, moving to {file_num}");
worker,
"was at file {current_file_number}, moving to {file_num}"
);
drive.move_to_file(*file_num)?; drive.move_to_file(*file_num)?;
let current_file_number = drive.current_file_number()?; let current_file_number = drive.current_file_number()?;
task_log!(worker, "now at file {}", current_file_number); info!("now at file {current_file_number}");
} }
let mut reader = drive.read_next_file()?; let mut reader = drive.read_next_file()?;
@ -1065,10 +1032,7 @@ fn restore_snapshots_to_tmpdir(
let source_datastore = archive_header.store; let source_datastore = archive_header.store;
let snapshot = archive_header.snapshot; let snapshot = archive_header.snapshot;
task_log!( info!("File {file_num}: snapshot archive {source_datastore}:{snapshot}",);
worker,
"File {file_num}: snapshot archive {source_datastore}:{snapshot}",
);
let mut decoder = let mut decoder =
pxar::decoder::sync::Decoder::from_std(pxar::PxarVariant::Unified(reader))?; pxar::decoder::sync::Decoder::from_std(pxar::PxarVariant::Unified(reader))?;
@ -1076,10 +1040,7 @@ fn restore_snapshots_to_tmpdir(
let target_datastore = match store_map.target_store(&source_datastore) { let target_datastore = match store_map.target_store(&source_datastore) {
Some(datastore) => datastore, Some(datastore) => datastore,
None => { None => {
task_warn!( warn!("could not find target datastore for {source_datastore}:{snapshot}",);
worker,
"could not find target datastore for {source_datastore}:{snapshot}",
);
continue; continue;
} }
}; };
@ -1131,10 +1092,10 @@ fn restore_file_chunk_map(
for (nr, chunk_map) in file_chunk_map.iter_mut() { for (nr, chunk_map) in file_chunk_map.iter_mut() {
let current_file_number = drive.current_file_number()?; let current_file_number = drive.current_file_number()?;
if current_file_number != *nr { if current_file_number != *nr {
task_log!(worker, "was at file {current_file_number}, moving to {nr}"); info!("was at file {current_file_number}, moving to {nr}");
drive.move_to_file(*nr)?; drive.move_to_file(*nr)?;
let current_file_number = drive.current_file_number()?; let current_file_number = drive.current_file_number()?;
task_log!(worker, "now at file {}", current_file_number); info!("now at file {current_file_number}");
} }
let mut reader = drive.read_next_file()?; let mut reader = drive.read_next_file()?;
let header: MediaContentHeader = unsafe { reader.read_le_value()? }; let header: MediaContentHeader = unsafe { reader.read_le_value()? };
@ -1151,10 +1112,7 @@ fn restore_file_chunk_map(
let source_datastore = archive_header.store; let source_datastore = archive_header.store;
task_log!( info!("File {nr}: chunk archive for datastore '{source_datastore}'",);
worker,
"File {nr}: chunk archive for datastore '{source_datastore}'",
);
let datastore = store_map.target_store(&source_datastore).ok_or_else(|| { let datastore = store_map.target_store(&source_datastore).ok_or_else(|| {
format_err!("unexpected chunk archive for store: {source_datastore}") format_err!("unexpected chunk archive for store: {source_datastore}")
@ -1166,7 +1124,7 @@ fn restore_file_chunk_map(
datastore.clone(), datastore.clone(),
chunk_map, chunk_map,
)?; )?;
task_log!(worker, "restored {count} chunks"); info!("restored {count} chunks");
} }
_ => bail!("unexpected content magic {:?}", header.content_magic), _ => bail!("unexpected content magic {:?}", header.content_magic),
} }
@ -1226,8 +1184,7 @@ fn restore_partial_chunk_archive<'a>(
let elapsed = start_time.elapsed()?.as_secs_f64(); let elapsed = start_time.elapsed()?.as_secs_f64();
let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst) as f64; let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst) as f64;
task_log!( info!(
worker,
"restored {} ({:.2}/s)", "restored {} ({:.2}/s)",
HumanByte::new_decimal(bytes), HumanByte::new_decimal(bytes),
HumanByte::new_decimal(bytes / elapsed), HumanByte::new_decimal(bytes / elapsed),
@ -1311,15 +1268,11 @@ pub fn restore_media(
let current_file_number = drive.current_file_number()?; let current_file_number = drive.current_file_number()?;
let reader = match drive.read_next_file() { let reader = match drive.read_next_file() {
Err(BlockReadError::EndOfFile) => { Err(BlockReadError::EndOfFile) => {
task_log!( info!("skip unexpected filemark at pos {current_file_number}");
worker,
"skip unexpected filemark at pos {}",
current_file_number
);
continue; continue;
} }
Err(BlockReadError::EndOfStream) => { Err(BlockReadError::EndOfStream) => {
task_log!(worker, "detected EOT after {} files", current_file_number); info!("detected EOT after {current_file_number} files");
break; break;
} }
Err(BlockReadError::Error(err)) => { Err(BlockReadError::Error(err)) => {
@ -1383,13 +1336,7 @@ fn restore_archive<'a>(
let datastore_name = archive_header.store; let datastore_name = archive_header.store;
let snapshot = archive_header.snapshot; let snapshot = archive_header.snapshot;
task_log!( info!("File {current_file_number}: snapshot archive {datastore_name}:{snapshot}");
worker,
"File {}: snapshot archive {}:{}",
current_file_number,
datastore_name,
snapshot
);
let (backup_ns, backup_dir) = parse_ns_and_snapshot(&snapshot)?; let (backup_ns, backup_dir) = parse_ns_and_snapshot(&snapshot)?;
@ -1423,16 +1370,16 @@ fn restore_archive<'a>(
path.push(rel_path); path.push(rel_path);
if is_new { if is_new {
task_log!(worker, "restore snapshot {}", backup_dir); info!("restore snapshot {backup_dir}");
match restore_snapshot_archive(worker.clone(), reader, &path) { match restore_snapshot_archive(worker, reader, &path) {
Err(err) => { Err(err) => {
std::fs::remove_dir_all(&path)?; std::fs::remove_dir_all(&path)?;
bail!("restore snapshot {} failed - {}", backup_dir, err); bail!("restore snapshot {} failed - {}", backup_dir, err);
} }
Ok(false) => { Ok(false) => {
std::fs::remove_dir_all(&path)?; std::fs::remove_dir_all(&path)?;
task_log!(worker, "skip incomplete snapshot {}", backup_dir); info!("skip incomplete snapshot {backup_dir}");
} }
Ok(true) => { Ok(true) => {
catalog.register_snapshot( catalog.register_snapshot(
@ -1448,7 +1395,7 @@ fn restore_archive<'a>(
return Ok(()); return Ok(());
} }
} else { } else {
task_log!(worker, "skipping..."); info!("skipping...");
} }
} }
@ -1475,12 +1422,7 @@ fn restore_archive<'a>(
let source_datastore = archive_header.store; let source_datastore = archive_header.store;
task_log!( info!("File {current_file_number}: chunk archive for datastore '{source_datastore}'");
worker,
"File {}: chunk archive for datastore '{}'",
current_file_number,
source_datastore
);
let datastore = target let datastore = target
.as_ref() .as_ref()
.and_then(|t| t.0.target_store(&source_datastore)); .and_then(|t| t.0.target_store(&source_datastore));
@ -1497,15 +1439,9 @@ fn restore_archive<'a>(
.or_default(); .or_default();
let chunks = if let Some(datastore) = datastore { let chunks = if let Some(datastore) = datastore {
restore_chunk_archive( restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)?
worker.clone(),
reader,
datastore,
checked_chunks,
verbose,
)?
} else { } else {
scan_chunk_archive(worker.clone(), reader, verbose)? scan_chunk_archive(worker, reader, verbose)?
}; };
if let Some(chunks) = chunks { if let Some(chunks) = chunks {
@ -1515,12 +1451,12 @@ fn restore_archive<'a>(
&source_datastore, &source_datastore,
&chunks[..], &chunks[..],
)?; )?;
task_log!(worker, "register {} chunks", chunks.len()); info!("register {} chunks", chunks.len());
catalog.commit_if_large()?; catalog.commit_if_large()?;
} }
return Ok(()); return Ok(());
} else if target.is_some() { } else if target.is_some() {
task_log!(worker, "skipping..."); info!("skipping...");
} }
reader.skip_data()?; // read all data reader.skip_data()?; // read all data
@ -1531,10 +1467,8 @@ fn restore_archive<'a>(
let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data) let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data)
.map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?; .map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?;
task_log!( info!(
worker, "File {current_file_number}: skip catalog '{}'",
"File {}: skip catalog '{}'",
current_file_number,
archive_header.uuid archive_header.uuid
); );
@ -1570,7 +1504,7 @@ fn scan_chunk_archive<'a>(
// check if this is an aborted stream without end marker // check if this is an aborted stream without end marker
if let Ok(false) = reader.has_end_marker() { if let Ok(false) = reader.has_end_marker() {
task_log!(worker, "missing stream end marker"); info!("missing stream end marker");
return Ok(None); return Ok(None);
} }
@ -1582,7 +1516,7 @@ fn scan_chunk_archive<'a>(
worker.check_abort()?; worker.check_abort()?;
if verbose { if verbose {
task_log!(worker, "Found chunk: {}", hex::encode(digest)); info!("Found chunk: {}", hex::encode(digest));
} }
chunks.push(digest); chunks.push(digest);
@ -1606,8 +1540,6 @@ fn restore_chunk_archive<'a>(
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
let bytes2 = bytes.clone(); let bytes2 = bytes.clone();
let worker2 = worker.clone();
let writer_pool = ParallelHandler::new( let writer_pool = ParallelHandler::new(
"tape restore chunk writer", "tape restore chunk writer",
4, 4,
@ -1615,7 +1547,7 @@ fn restore_chunk_archive<'a>(
let chunk_exists = datastore.cond_touch_chunk(&digest, false)?; let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
if !chunk_exists { if !chunk_exists {
if verbose { if verbose {
task_log!(worker2, "Insert chunk: {}", hex::encode(digest)); info!("Insert chunk: {}", hex::encode(digest));
} }
bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst); bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
// println!("verify and write {}", hex::encode(&digest)); // println!("verify and write {}", hex::encode(&digest));
@ -1626,7 +1558,7 @@ fn restore_chunk_archive<'a>(
datastore.insert_chunk(&chunk, &digest)?; datastore.insert_chunk(&chunk, &digest)?;
} else if verbose { } else if verbose {
task_log!(worker2, "Found existing chunk: {}", hex::encode(digest)); info!("Found existing chunk: {}", hex::encode(digest));
} }
Ok(()) Ok(())
}, },
@ -1648,7 +1580,7 @@ fn restore_chunk_archive<'a>(
// check if this is an aborted stream without end marker // check if this is an aborted stream without end marker
if let Ok(false) = reader.has_end_marker() { if let Ok(false) = reader.has_end_marker() {
task_log!(worker, "missing stream end marker"); info!("missing stream end marker");
return Ok(None); return Ok(None);
} }
@ -1672,8 +1604,7 @@ fn restore_chunk_archive<'a>(
let elapsed = start_time.elapsed()?.as_secs_f64(); let elapsed = start_time.elapsed()?.as_secs_f64();
let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst) as f64; let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst) as f64;
task_log!( info!(
worker,
"restored {} ({:.2}/s)", "restored {} ({:.2}/s)",
HumanByte::new_decimal(bytes), HumanByte::new_decimal(bytes),
HumanByte::new_decimal(bytes / elapsed), HumanByte::new_decimal(bytes / elapsed),
@ -1818,7 +1749,6 @@ fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
/// Try to restore media catalogs (form catalog_archives) /// Try to restore media catalogs (form catalog_archives)
pub fn fast_catalog_restore( pub fn fast_catalog_restore(
worker: &WorkerTask,
drive: &mut Box<dyn TapeDriver>, drive: &mut Box<dyn TapeDriver>,
media_set: &MediaSet, media_set: &MediaSet,
uuid: &Uuid, // current media Uuid uuid: &Uuid, // current media Uuid
@ -1839,14 +1769,11 @@ pub fn fast_catalog_restore(
// limit reader scope // limit reader scope
let mut reader = match drive.read_next_file() { let mut reader = match drive.read_next_file() {
Err(BlockReadError::EndOfFile) => { Err(BlockReadError::EndOfFile) => {
task_log!( info!("skip unexpected filemark at pos {current_file_number}");
worker,
"skip unexpected filemark at pos {current_file_number}"
);
continue; continue;
} }
Err(BlockReadError::EndOfStream) => { Err(BlockReadError::EndOfStream) => {
task_log!(worker, "detected EOT after {current_file_number} files"); info!("detected EOT after {current_file_number} files");
break; break;
} }
Err(BlockReadError::Error(err)) => { Err(BlockReadError::Error(err)) => {
@ -1863,7 +1790,7 @@ pub fn fast_catalog_restore(
if header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0 if header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
|| header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1 || header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
{ {
task_log!(worker, "found catalog at pos {}", current_file_number); info!("found catalog at pos {current_file_number}");
let header_data = reader.read_exact_allocated(header.size as usize)?; let header_data = reader.read_exact_allocated(header.size as usize)?;
@ -1873,11 +1800,7 @@ pub fn fast_catalog_restore(
})?; })?;
if &archive_header.media_set_uuid != media_set.uuid() { if &archive_header.media_set_uuid != media_set.uuid() {
task_log!( info!("skipping unrelated catalog at pos {current_file_number}");
worker,
"skipping unrelated catalog at pos {}",
current_file_number
);
reader.skip_data()?; // read all data reader.skip_data()?; // read all data
continue; continue;
} }
@ -1890,11 +1813,7 @@ pub fn fast_catalog_restore(
}); });
if !wanted { if !wanted {
task_log!( info!("skip catalog because media '{catalog_uuid}' not inventarized");
worker,
"skip catalog because media '{}' not inventarized",
catalog_uuid
);
reader.skip_data()?; // read all data reader.skip_data()?; // read all data
continue; continue;
} }
@ -1904,11 +1823,7 @@ pub fn fast_catalog_restore(
} else { } else {
// only restore if catalog does not exist // only restore if catalog does not exist
if MediaCatalog::exists(TAPE_STATUS_DIR, catalog_uuid) { if MediaCatalog::exists(TAPE_STATUS_DIR, catalog_uuid) {
task_log!( info!("catalog for media '{catalog_uuid}' already exists");
worker,
"catalog for media '{}' already exists",
catalog_uuid
);
reader.skip_data()?; // read all data reader.skip_data()?; // read all data
continue; continue;
} }
@ -1924,19 +1839,11 @@ pub fn fast_catalog_restore(
match MediaCatalog::parse_catalog_header(&mut file)? { match MediaCatalog::parse_catalog_header(&mut file)? {
(true, Some(media_uuid), Some(media_set_uuid)) => { (true, Some(media_uuid), Some(media_set_uuid)) => {
if &media_uuid != catalog_uuid { if &media_uuid != catalog_uuid {
task_log!( info!("catalog uuid mismatch at pos {current_file_number}");
worker,
"catalog uuid mismatch at pos {}",
current_file_number
);
continue; continue;
} }
if media_set_uuid != archive_header.media_set_uuid { if media_set_uuid != archive_header.media_set_uuid {
task_log!( info!("catalog media_set mismatch at pos {current_file_number}");
worker,
"catalog media_set mismatch at pos {}",
current_file_number
);
continue; continue;
} }
@ -1947,18 +1854,14 @@ pub fn fast_catalog_restore(
)?; )?;
if catalog_uuid == uuid { if catalog_uuid == uuid {
task_log!(worker, "successfully restored catalog"); info!("successfully restored catalog");
found_catalog = true found_catalog = true
} else { } else {
task_log!( info!("successfully restored related catalog {media_uuid}");
worker,
"successfully restored related catalog {}",
media_uuid
);
} }
} }
_ => { _ => {
task_warn!(worker, "got incomplete catalog header - skip file"); warn!("got incomplete catalog header - skip file");
continue; continue;
} }
} }
@ -1972,7 +1875,7 @@ pub fn fast_catalog_restore(
} }
moved_to_eom = true; moved_to_eom = true;
task_log!(worker, "searching for catalog at EOT (moving to EOT)"); info!("searching for catalog at EOT (moving to EOT)");
drive.move_to_last_file()?; drive.move_to_last_file()?;
let new_file_number = drive.current_file_number()?; let new_file_number = drive.current_file_number()?;