diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs index e8056beb..52520811 100644 --- a/pbs-api-types/src/jobs.rs +++ b/pbs-api-types/src/jobs.rs @@ -536,6 +536,10 @@ impl SyncDirection { } } +pub const RESYNC_CORRUPT_SCHEMA: Schema = + BooleanSchema::new("If the verification failed for a local snapshot, try to pull it again.") + .schema(); + #[api( properties: { id: { @@ -590,6 +594,10 @@ impl SyncDirection { schema: TRANSFER_LAST_SCHEMA, optional: true, }, + "resync-corrupt": { + schema: RESYNC_CORRUPT_SCHEMA, + optional: true, + } } )] #[derive(Serialize, Deserialize, Clone, Updater, PartialEq)] @@ -623,6 +631,8 @@ pub struct SyncJobConfig { pub limit: RateLimitConfig, #[serde(skip_serializing_if = "Option::is_none")] pub transfer_last: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub resync_corrupt: Option, } impl SyncJobConfig { diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs index 78eb7320..7ff6cae0 100644 --- a/src/api2/config/sync.rs +++ b/src/api2/config/sync.rs @@ -471,6 +471,9 @@ pub fn update_sync_job( if let Some(transfer_last) = update.transfer_last { data.transfer_last = Some(transfer_last); } + if let Some(resync_corrupt) = update.resync_corrupt { + data.resync_corrupt = Some(resync_corrupt); + } if update.limit.rate_in.is_some() { data.limit.rate_in = update.limit.rate_in; @@ -629,6 +632,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator ns: None, owner: Some(write_auth_id.clone()), comment: None, + resync_corrupt: None, remove_vanished: None, max_depth: None, group_filter: None, diff --git a/src/api2/pull.rs b/src/api2/pull.rs index d039dab5..d8ed1a73 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -10,7 +10,7 @@ use pbs_api_types::{ Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, - TRANSFER_LAST_SCHEMA, + RESYNC_CORRUPT_SCHEMA, TRANSFER_LAST_SCHEMA, }; use pbs_config::CachedUserInfo; use proxmox_rest_server::WorkerTask; @@ -87,6 +87,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters { sync_job.group_filter.clone(), sync_job.limit.clone(), sync_job.transfer_last, + sync_job.resync_corrupt, ) } } @@ -132,6 +133,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters { schema: TRANSFER_LAST_SCHEMA, optional: true, }, + "resync-corrupt": { + schema: RESYNC_CORRUPT_SCHEMA, + optional: true, + }, }, }, access: { @@ -156,6 +161,7 @@ async fn pull( group_filter: Option>, limit: RateLimitConfig, transfer_last: Option, + resync_corrupt: Option, rpcenv: &mut dyn RpcEnvironment, ) -> Result { let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; @@ -193,6 +199,7 @@ async fn pull( group_filter, limit, transfer_last, + resync_corrupt, )?; // fixme: set to_stdout to false? diff --git a/src/server/pull.rs b/src/server/pull.rs index 4951ccfb..9abb673a 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -12,8 +12,9 @@ use tracing::info; use pbs_api_types::{ print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup, - BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, CLIENT_LOG_BLOB_NAME, - MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, + BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, VerifyState, + CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, + PRIV_DATASTORE_BACKUP, }; use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; @@ -54,6 +55,8 @@ pub(crate) struct PullParameters { group_filter: Vec, /// How many snapshots should be transferred at most (taking the newest N snapshots) transfer_last: Option, + /// Whether to re-sync corrupted snapshots + resync_corrupt: bool, } impl PullParameters { @@ -71,12 +74,14 @@ impl PullParameters { group_filter: Option>, limit: RateLimitConfig, transfer_last: Option, + resync_corrupt: Option, ) -> Result { if let Some(max_depth) = max_depth { ns.check_max_depth(max_depth)?; remote_ns.check_max_depth(max_depth)?; }; let remove_vanished = remove_vanished.unwrap_or(false); + let resync_corrupt = resync_corrupt.unwrap_or(false); let source: Arc = if let Some(remote) = remote { let (remote_config, _digest) = pbs_config::remote::config()?; @@ -115,6 +120,7 @@ impl PullParameters { max_depth, group_filter, transfer_last, + resync_corrupt, }) } } @@ -322,7 +328,7 @@ async fn pull_single_archive<'a>( /// /// Pulling a snapshot consists of the following steps: /// - (Re)download the manifest -/// -- if it matches, only download log and treat snapshot as already synced +/// -- if it matches and is not corrupt, only download log and treat snapshot as already synced /// - Iterate over referenced files /// -- if file already exists, verify contents /// -- if not, pull it from the remote @@ -331,6 +337,7 @@ async fn pull_snapshot<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, + corrupt: bool, ) -> Result { let mut sync_stats = SyncStats::default(); let mut manifest_name = snapshot.full_path(); @@ -351,7 +358,7 @@ async fn pull_snapshot<'a>( return Ok(sync_stats); } - if manifest_name.exists() { + if manifest_name.exists() && !corrupt { let manifest_blob = proxmox_lang::try_block!({ let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { format_err!("unable to open local manifest {manifest_name:?} - {err}") @@ -380,7 +387,7 @@ async fn pull_snapshot<'a>( let mut path = snapshot.full_path(); path.push(&item.filename); - if path.exists() { + if !corrupt && path.exists() { let filename: BackupArchiveName = item.filename.as_str().try_into()?; match filename.archive_type() { ArchiveType::DynamicIndex => { @@ -443,6 +450,7 @@ async fn pull_snapshot_from<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, + corrupt: bool, ) -> Result { let (_path, is_new, _snap_lock) = snapshot .datastore() @@ -451,7 +459,8 @@ async fn pull_snapshot_from<'a>( let sync_stats = if is_new { info!("sync snapshot {}", snapshot.dir()); - match pull_snapshot(reader, snapshot, downloaded_chunks).await { + // this snapshot is new, so it can never be corrupt + match pull_snapshot(reader, snapshot, downloaded_chunks, false).await { Err(err) => { if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( snapshot.backup_ns(), @@ -468,8 +477,12 @@ async fn pull_snapshot_from<'a>( } } } else { - info!("re-sync snapshot {}", snapshot.dir()); - pull_snapshot(reader, snapshot, downloaded_chunks).await? + if corrupt { + info!("re-sync snapshot {} due to corruption", snapshot.dir()); + } else { + info!("re-sync snapshot {}", snapshot.dir()); + } + pull_snapshot(reader, snapshot, downloaded_chunks, corrupt).await? }; Ok(sync_stats) @@ -523,26 +536,52 @@ async fn pull_group( .last_successful_backup(&target_ns, group)? .unwrap_or(i64::MIN); - let list: Vec = raw_list + // Filter remote BackupDirs to include in pull + // Also stores if the snapshot is corrupt (verification job failed) + let list: Vec<(BackupDir, bool)> = raw_list .into_iter() .enumerate() - .filter(|&(pos, ref dir)| { + .filter_map(|(pos, dir)| { source_snapshots.insert(dir.time); + // If resync_corrupt is set, check if the corresponding local snapshot failed to + // verification + if params.resync_corrupt { + let local_dir = params + .target + .store + .backup_dir(target_ns.clone(), dir.clone()); + if let Ok(local_dir) = local_dir { + match local_dir.verify_state() { + Ok(Some(state)) => { + if state == VerifyState::Failed { + return Some((dir, true)); + } + } + Ok(None) => { + // The verify_state item was not found in the manifest, this means the + // snapshot is new. + } + Err(_) => { + // There was an error loading the manifest, probably better if we + // resync. + return Some((dir, true)); + } + } + } + } // Note: the snapshot represented by `last_sync_time` might be missing its backup log // or post-backup verification state if those were not yet available during the last // sync run, always resync it if last_sync_time > dir.time { already_synced_skip_info.update(dir.time); - return false; + return None; } - if pos < cutoff && last_sync_time != dir.time { transfer_last_skip_info.update(dir.time); - return false; + return None; } - true + Some((dir, false)) }) - .map(|(_, dir)| dir) .collect(); if already_synced_skip_info.count > 0 { @@ -561,7 +600,7 @@ async fn pull_group( let mut sync_stats = SyncStats::default(); - for (pos, from_snapshot) in list.into_iter().enumerate() { + for (pos, (from_snapshot, corrupt)) in list.into_iter().enumerate() { let to_snapshot = params .target .store @@ -571,7 +610,8 @@ async fn pull_group( .source .reader(source_namespace, &from_snapshot) .await?; - let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await; + let result = + pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await; progress.done_snapshots = pos as u64 + 1; info!("percentage done: {progress}");