diff --git a/src/server/mod.rs b/src/server/mod.rs index 2e40bde3..7c14ed4b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -36,6 +36,7 @@ pub mod auth; pub mod metric_collection; pub(crate) mod pull; +pub(crate) mod push; pub(crate) mod sync; pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> { diff --git a/src/server/push.rs b/src/server/push.rs new file mode 100644 index 00000000..1a478b5e --- /dev/null +++ b/src/server/push.rs @@ -0,0 +1,992 @@ +//! Sync datastore by pushing contents to remote server + +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; + +use anyhow::{bail, format_err, Error}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, warn}; + +use pbs_api_types::{ + print_store_and_ns, ApiVersion, ApiVersionInfo, Authid, BackupDir, BackupGroup, + BackupGroupDeleteStats, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem, + Operation, RateLimitConfig, Remote, SnapshotListItem, PRIV_DATASTORE_BACKUP, + PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_MODIFY, + PRIV_REMOTE_DATASTORE_PRUNE, +}; +use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions}; +use pbs_config::CachedUserInfo; +use pbs_datastore::data_blob::ChunkInfo; +use pbs_datastore::dynamic_index::DynamicIndexReader; +use pbs_datastore::fixed_index::FixedIndexReader; +use pbs_datastore::index::IndexFile; +use pbs_datastore::manifest::{ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME}; +use pbs_datastore::read_chunk::AsyncReadChunk; +use pbs_datastore::{DataStore, StoreProgress}; + +use super::sync::{ + check_namespace_depth_limit, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason, + SyncSource, SyncStats, +}; +use crate::api2::config::remote; + +/// Target for backups to be pushed to +pub(crate) struct PushTarget { + // Remote as found in remote.cfg + remote: Remote, + // Target repository on remote + repo: BackupRepository, + // Target namespace on remote + ns: BackupNamespace, + // Http client to connect to remote + client: HttpClient, + // Remote target api supports prune delete stats + supports_prune_delete_stats: bool, +} + +impl PushTarget { + fn remote_user(&self) -> Authid { + self.remote.config.auth_id.clone() + } +} + +/// Parameters for a push operation +pub(crate) struct PushParameters { + /// Source of backups to be pushed to remote + source: Arc, + /// Target for backups to be pushed to + target: PushTarget, + /// User used for permission checks on the source side, including potentially filtering visible + /// namespaces and backup groups. + local_user: Authid, + /// Whether to remove groups and namespaces which exist locally, but not on the remote end + remove_vanished: bool, + /// How many levels of sub-namespaces to push (0 == no recursion, None == maximum recursion) + max_depth: Option, + /// Filters for reducing the push scope + group_filter: Vec, + /// How many snapshots should be transferred at most (taking the newest N snapshots) + transfer_last: Option, +} + +impl PushParameters { + /// Creates a new instance of `PushParameters`. + #[allow(clippy::too_many_arguments)] + pub(crate) async fn new( + store: &str, + ns: BackupNamespace, + remote_id: &str, + remote_store: &str, + remote_ns: BackupNamespace, + local_user: Authid, + remove_vanished: Option, + max_depth: Option, + group_filter: Option>, + limit: RateLimitConfig, + transfer_last: Option, + ) -> Result { + if let Some(max_depth) = max_depth { + ns.check_max_depth(max_depth)?; + remote_ns.check_max_depth(max_depth)?; + }; + let remove_vanished = remove_vanished.unwrap_or(false); + + let source = Arc::new(LocalSource { + store: DataStore::lookup_datastore(store, Some(Operation::Read))?, + ns, + }); + + let (remote_config, _digest) = pbs_config::remote::config()?; + let remote: Remote = remote_config.lookup("remote", remote_id)?; + + let repo = BackupRepository::new( + Some(remote.config.auth_id.clone()), + Some(remote.config.host.clone()), + remote.config.port, + remote_store.to_string(), + ); + + let client = remote::remote_client_config(&remote, Some(limit))?; + + let mut result = client.get("api2/json/version", None).await?; + let data = result["data"].take(); + let version_info: ApiVersionInfo = serde_json::from_value(data)?; + let api_version = ApiVersion::try_from(version_info)?; + let supports_prune_delete_stats = api_version.supports_feature("prune-delete-stats"); + if api_version.major < 2 || (api_version.major == 2 && api_version.minor < 2) { + bail!("unsupported remote api version, minimum v2.2 required"); + } + + let target = PushTarget { + remote, + repo, + ns: remote_ns, + client, + supports_prune_delete_stats, + }; + let group_filter = group_filter.unwrap_or_default(); + + Ok(Self { + source, + target, + local_user, + remove_vanished, + max_depth, + group_filter, + transfer_last, + }) + } + + // Map the given namespace from source to target by adapting the prefix + fn map_to_target(&self, namespace: &BackupNamespace) -> Result { + namespace.map_prefix(&self.source.ns, &self.target.ns) + } +} + +// Check if the job user given in the push parameters has the provided privs on the remote +// datastore namespace +fn check_ns_remote_datastore_privs( + params: &PushParameters, + target_namespace: &BackupNamespace, + privs: u64, +) -> Result<(), Error> { + let user_info = CachedUserInfo::new()?; + let acl_path = + target_namespace.remote_acl_path(¶ms.target.remote.name, params.target.repo.store()); + + user_info.check_privs(¶ms.local_user, &acl_path, privs, false)?; + + Ok(()) +} + +// Fetch the list of namespaces found on target +async fn fetch_target_namespaces(params: &PushParameters) -> Result, Error> { + let api_path = format!( + "api2/json/admin/datastore/{store}/namespace", + store = params.target.repo.store(), + ); + let mut result = params.target.client.get(&api_path, None).await?; + let namespaces: Vec = serde_json::from_value(result["data"].take())?; + let mut namespaces: Vec = namespaces + .into_iter() + .map(|namespace| namespace.ns) + .collect(); + namespaces.sort_unstable_by_key(|a| a.name_len()); + + Ok(namespaces) +} + +// Remove the provided namespace from the target +async fn remove_target_namespace( + params: &PushParameters, + target_namespace: &BackupNamespace, +) -> Result { + if target_namespace.is_root() { + bail!("cannot remove root namespace from target"); + } + + check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_MODIFY) + .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?; + + let api_path = format!( + "api2/json/admin/datastore/{store}/namespace", + store = params.target.repo.store(), + ); + + let mut args = serde_json::json!({ + "ns": target_namespace.name(), + "delete-groups": true, + }); + + if params.target.supports_prune_delete_stats { + args["error-on-protected"] = serde_json::to_value(false)?; + } + + let mut result = params.target.client.delete(&api_path, Some(args)).await?; + + if params.target.supports_prune_delete_stats { + let data = result["data"].take(); + serde_json::from_value(data).map_err(|err| { + format_err!("removing target namespace {target_namespace} failed - {err}") + }) + } else { + Ok(BackupGroupDeleteStats::default()) + } +} + +// Fetch the list of groups found on target in given namespace +// Returns sorted list of owned groups and a hashset containing not owned backup groups on target. +async fn fetch_target_groups( + params: &PushParameters, + target_namespace: &BackupNamespace, +) -> Result<(Vec, HashSet), Error> { + let api_path = format!( + "api2/json/admin/datastore/{store}/groups", + store = params.target.repo.store(), + ); + let args = Some(serde_json::json!({ "ns": target_namespace.name() })); + + let mut result = params.target.client.get(&api_path, args).await?; + let groups: Vec = serde_json::from_value(result["data"].take())?; + + let (mut owned, not_owned) = groups.iter().fold( + (Vec::new(), HashSet::new()), + |(mut owned, mut not_owned), group| { + if let Some(ref owner) = group.owner { + if params.target.remote_user() == *owner { + owned.push(group.backup.clone()); + return (owned, not_owned); + } + } + not_owned.insert(group.backup.clone()); + (owned, not_owned) + }, + ); + + owned.sort_unstable(); + + Ok((owned, not_owned)) +} + +// Remove the provided backup group in given namespace from the target +async fn remove_target_group( + params: &PushParameters, + target_namespace: &BackupNamespace, + backup_group: &BackupGroup, +) -> Result { + check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_PRUNE) + .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?; + + let api_path = format!( + "api2/json/admin/datastore/{store}/groups", + store = params.target.repo.store(), + ); + + let mut args = serde_json::json!({ + "backup-id": backup_group.id, + "backup-type": backup_group.ty, + }); + + if params.target.supports_prune_delete_stats { + args["error-on-protected"] = serde_json::to_value(false)?; + } + + args["ns"] = serde_json::to_value(target_namespace.name())?; + + let mut result = params.target.client.delete(&api_path, Some(args)).await?; + + if params.target.supports_prune_delete_stats { + let data = result["data"].take(); + serde_json::from_value(data) + .map_err(|err| format_err!("removing target group {backup_group} failed - {err}")) + } else { + Ok(BackupGroupDeleteStats::default()) + } +} + +// Check if the namespace is already present on the target, create it otherwise +async fn check_or_create_target_namespace( + params: &PushParameters, + target_namespaces: &[BackupNamespace], + target_namespace: &BackupNamespace, +) -> Result<(), Error> { + if !target_namespace.is_root() && !target_namespaces.contains(target_namespace) { + // Namespace not present on target, create namespace. + // Sub-namespaces have to be created by creating parent components first. + + check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_MODIFY) + .map_err(|err| format_err!("Creating namespace not allowed - {err}"))?; + + let mut parent = BackupNamespace::root(); + for component in target_namespace.components() { + let current = BackupNamespace::from_parent_ns(&parent, component.to_string())?; + // Skip over pre-existing parent namespaces on target + if target_namespaces.contains(¤t) { + parent = current; + continue; + } + let api_path = format!( + "api2/json/admin/datastore/{store}/namespace", + store = params.target.repo.store(), + ); + let mut args = serde_json::json!({ "name": component.to_string() }); + if !parent.is_root() { + args["parent"] = serde_json::to_value(parent.clone())?; + } + let target_store_and_ns = print_store_and_ns(params.target.repo.store(), ¤t); + match params.target.client.post(&api_path, Some(args)).await { + Ok(_) => info!("Created new namespace on target: {target_store_and_ns}"), + Err(err) => bail!( + "Sync into {target_store_and_ns} failed - namespace creation failed: {err}" + ), + } + parent = current; + } + } + + Ok(()) +} + +/// Push contents of source datastore matched by given push parameters to target. +pub(crate) async fn push_store(mut params: PushParameters) -> Result { + let mut errors = false; + + let user_info = CachedUserInfo::new()?; + let store = params.source.get_store().to_owned(); + let auth_id = params.local_user.clone(); + // Generate list of source namespaces to push to target, limited by max-depth and filtered + // by local user access privs. + let ns_access_filter = Box::new(move |namespace: &BackupNamespace| { + let acl_path = namespace.acl_path(&store); + let privs = PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP; + user_info + .check_privs(&auth_id, &acl_path, privs, true) + .is_ok() + }); + let mut namespaces = params + .source + .list_namespaces(&mut params.max_depth, ns_access_filter) + .await?; + + check_namespace_depth_limit(¶ms.source.get_ns(), ¶ms.target.ns, &namespaces)?; + + namespaces.sort_unstable_by_key(|a| a.name_len()); + + // Fetch all accessible namespaces already present on the target + let target_namespaces = fetch_target_namespaces(¶ms).await?; + // Remember synced namespaces, removing non-synced ones when remove vanished flag is set + let mut synced_namespaces = HashSet::with_capacity(namespaces.len()); + + let (mut groups, mut snapshots) = (0, 0); + let mut stats = SyncStats::default(); + for namespace in &namespaces { + let source_store_and_ns = print_store_and_ns(params.source.store.name(), namespace); + let target_namespace = params.map_to_target(&namespace)?; + let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &target_namespace); + + info!("----"); + info!("Syncing {source_store_and_ns} into {target_store_and_ns}"); + + synced_namespaces.insert(target_namespace.clone()); + + if let Err(err) = + check_or_create_target_namespace(¶ms, &target_namespaces, &target_namespace).await + { + info!("Cannot sync {source_store_and_ns} into {target_store_and_ns} - {err}"); + errors = true; + continue; + } + + match push_namespace(namespace, ¶ms).await { + Ok((sync_progress, sync_stats, sync_errors)) => { + errors |= sync_errors; + stats.add(sync_stats); + + if params.max_depth != Some(0) { + groups += sync_progress.done_groups; + snapshots += sync_progress.done_snapshots; + + let ns = if namespace.is_root() { + "root namespace".into() + } else { + format!("namespace {namespace}") + }; + info!( + "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots" + ); + } + } + Err(err) => { + errors = true; + info!("Encountered errors while syncing namespace {namespace} - {err}"); + } + } + } + + if params.remove_vanished { + // Attention: Filter out all namespaces which are not sub-namespaces of the sync target + // namespace, or not included in the sync because of the depth limit. + // Without this pre-filtering, all namespaces unrelated to the sync would be removed! + let mut target_sub_namespaces = Vec::new(); + for namespace in &namespaces { + let target_namespace = params.map_to_target(&namespace)?; + let mut sub_namespaces = target_namespaces + .iter() + .filter(|namespace| { + if let Some(depth) = target_namespace.contains(namespace) { + if let Some(max_depth) = params.max_depth { + return depth <= max_depth; + } + return true; + } + false + }) + .collect(); + target_sub_namespaces.append(&mut sub_namespaces); + } + + // Sort by namespace length and revert for sub-namespaces to be removed before parents + target_sub_namespaces.sort_unstable_by_key(|a| a.name_len()); + target_sub_namespaces.reverse(); + + for target_namespace in target_sub_namespaces { + if synced_namespaces.contains(&target_namespace) { + continue; + } + match remove_target_namespace(¶ms, &target_namespace).await { + Ok(delete_stats) => { + stats.add(SyncStats::from(RemovedVanishedStats { + snapshots: delete_stats.removed_snapshots(), + groups: delete_stats.removed_groups(), + namespaces: 1, + })); + if delete_stats.protected_snapshots() > 0 { + warn!( + "kept {protected_count} protected snapshots of namespace '{target_namespace}'", + protected_count = delete_stats.protected_snapshots(), + ); + continue; + } + } + Err(err) => { + warn!("failed to remove vanished namespace {target_namespace} - {err}"); + continue; + } + } + info!("removed vanished namespace {target_namespace}"); + } + + if !params.target.supports_prune_delete_stats { + info!("Older api version on remote detected, delete stats might be incomplete"); + } + } + + if errors { + bail!("sync failed with some errors."); + } + + Ok(stats) +} + +/// Push namespace including all backup groups to target +/// +/// Iterate over all backup groups in the namespace and push them to the target. +pub(crate) async fn push_namespace( + namespace: &BackupNamespace, + params: &PushParameters, +) -> Result<(StoreProgress, SyncStats, bool), Error> { + let target_namespace = params.map_to_target(namespace)?; + // Check if user is allowed to perform backups on remote datastore + check_ns_remote_datastore_privs(params, &target_namespace, PRIV_REMOTE_DATASTORE_BACKUP) + .map_err(|err| format_err!("Pushing to remote not allowed - {err}"))?; + + let mut list: Vec = params + .source + .list_groups(namespace, ¶ms.local_user) + .await?; + + list.sort_unstable(); + + let total = list.len(); + let list: Vec = list + .into_iter() + .filter(|group| group.apply_filters(¶ms.group_filter)) + .collect(); + + info!( + "found {filtered} groups to sync (out of {total} total)", + filtered = list.len() + ); + + let mut errors = false; + // Remember synced groups, remove others when the remove vanished flag is set + let mut synced_groups = HashSet::new(); + let mut progress = StoreProgress::new(list.len() as u64); + let mut stats = SyncStats::default(); + + let (owned_target_groups, not_owned_target_groups) = + fetch_target_groups(params, &target_namespace).await?; + + for (done, group) in list.into_iter().enumerate() { + progress.done_groups = done as u64; + progress.done_snapshots = 0; + progress.group_snapshots = 0; + + if not_owned_target_groups.contains(&group) { + warn!( + "group '{group}' not owned by remote user '{}' on target, skip", + params.target.remote_user(), + ); + continue; + } + synced_groups.insert(group.clone()); + + match push_group(params, namespace, &group, &mut progress).await { + Ok(sync_stats) => stats.add(sync_stats), + Err(err) => { + warn!("sync group '{group}' failed - {err}"); + errors = true; + } + } + } + + if params.remove_vanished { + // only ever allow to prune owned groups on target + for target_group in owned_target_groups { + if synced_groups.contains(&target_group) { + continue; + } + if !target_group.apply_filters(¶ms.group_filter) { + continue; + } + + info!("delete vanished group '{target_group}'"); + + match remove_target_group(params, &target_namespace, &target_group).await { + Ok(delete_stats) => { + if delete_stats.protected_snapshots() > 0 { + warn!( + "kept {protected_count} protected snapshots of group '{target_group}'", + protected_count = delete_stats.protected_snapshots(), + ); + } + stats.add(SyncStats::from(RemovedVanishedStats { + snapshots: delete_stats.removed_snapshots(), + groups: delete_stats.removed_groups(), + namespaces: 0, + })); + } + Err(err) => { + warn!("failed to delete vanished group - {err}"); + errors = true; + continue; + } + } + } + } + + Ok((progress, stats, errors)) +} + +async fn fetch_target_snapshots( + params: &PushParameters, + target_namespace: &BackupNamespace, + group: &BackupGroup, +) -> Result, Error> { + let api_path = format!( + "api2/json/admin/datastore/{store}/snapshots", + store = params.target.repo.store(), + ); + let mut args = serde_json::to_value(group)?; + if !target_namespace.is_root() { + args["ns"] = serde_json::to_value(target_namespace)?; + } + let mut result = params.target.client.get(&api_path, Some(args)).await?; + let snapshots: Vec = serde_json::from_value(result["data"].take())?; + + Ok(snapshots) +} + +async fn fetch_previous_backup_time( + params: &PushParameters, + target_namespace: &BackupNamespace, + group: &BackupGroup, +) -> Result, Error> { + let mut snapshots = fetch_target_snapshots(params, target_namespace, group).await?; + snapshots.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); + Ok(snapshots.last().map(|snapshot| snapshot.backup.time)) +} + +async fn forget_target_snapshot( + params: &PushParameters, + target_namespace: &BackupNamespace, + snapshot: &BackupDir, +) -> Result<(), Error> { + check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_PRUNE) + .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?; + + let api_path = format!( + "api2/json/admin/datastore/{store}/snapshots", + store = params.target.repo.store(), + ); + let mut args = serde_json::to_value(snapshot)?; + if !target_namespace.is_root() { + args["ns"] = serde_json::to_value(target_namespace)?; + } + params.target.client.delete(&api_path, Some(args)).await?; + + Ok(()) +} + +/// Push group including all snaphshots to target +/// +/// Iterate over all snapshots in the group and push them to the target. +/// The group sync operation consists of the following steps: +/// - Query snapshots of given group from the source +/// - Sort snapshots by time +/// - Apply transfer last cutoff and filters to list +/// - Iterate the snapshot list and push each snapshot individually +/// - (Optional): Remove vanished groups if `remove_vanished` flag is set +pub(crate) async fn push_group( + params: &PushParameters, + namespace: &BackupNamespace, + group: &BackupGroup, + progress: &mut StoreProgress, +) -> Result { + let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); + let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); + + let mut snapshots: Vec = params.source.list_backup_dirs(namespace, group).await?; + snapshots.sort_unstable_by(|a, b| a.time.cmp(&b.time)); + + let total_snapshots = snapshots.len(); + let cutoff = params + .transfer_last + .map(|count| total_snapshots.saturating_sub(count)) + .unwrap_or_default(); + + let target_namespace = params.map_to_target(namespace)?; + let last_snapshot_time = fetch_previous_backup_time(params, &target_namespace, group) + .await? + .unwrap_or(i64::MIN); + + let mut source_snapshots = HashSet::new(); + let snapshots: Vec = snapshots + .into_iter() + .enumerate() + .filter(|&(pos, ref snapshot)| { + source_snapshots.insert(snapshot.time); + if last_snapshot_time >= snapshot.time { + already_synced_skip_info.update(snapshot.time); + return false; + } + if pos < cutoff { + transfer_last_skip_info.update(snapshot.time); + return false; + } + true + }) + .map(|(_, dir)| dir) + .collect(); + + if already_synced_skip_info.count > 0 { + info!("{already_synced_skip_info}"); + already_synced_skip_info.reset(); + } + if transfer_last_skip_info.count > 0 { + info!("{transfer_last_skip_info}"); + transfer_last_skip_info.reset(); + } + + progress.group_snapshots = snapshots.len() as u64; + + let target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?; + let target_snapshots: Vec = target_snapshots + .into_iter() + .map(|snapshot| snapshot.backup) + .collect(); + + let mut stats = SyncStats::default(); + let mut fetch_previous_manifest = !target_snapshots.is_empty(); + for (pos, source_snapshot) in snapshots.into_iter().enumerate() { + if target_snapshots.contains(&source_snapshot) { + progress.done_snapshots = pos as u64 + 1; + info!("percentage done: {progress}"); + continue; + } + let result = + push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await; + fetch_previous_manifest = true; + + progress.done_snapshots = pos as u64 + 1; + info!("percentage done: {progress}"); + + // stop on error + let sync_stats = result?; + stats.add(sync_stats); + } + + if params.remove_vanished { + let target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?; + for snapshot in target_snapshots { + if source_snapshots.contains(&snapshot.backup.time) { + continue; + } + if snapshot.protected { + info!( + "don't delete vanished snapshot {name} (protected)", + name = snapshot.backup + ); + continue; + } + if let Err(err) = + forget_target_snapshot(params, &target_namespace, &snapshot.backup).await + { + info!( + "could not delete vanished snapshot {name} - {err}", + name = snapshot.backup + ); + } + info!("delete vanished snapshot {name}", name = snapshot.backup); + stats.add(SyncStats::from(RemovedVanishedStats { + snapshots: 1, + groups: 0, + namespaces: 0, + })); + } + } + + Ok(stats) +} + +/// Push snapshot to target +/// +/// Creates a new snapshot on the target and pushes the content of the source snapshot to the +/// target by creating a new manifest file and connecting to the remote as backup writer client. +/// Chunks are written by recreating the index by uploading the chunk stream as read from the +/// source. Data blobs are uploaded as such. +pub(crate) async fn push_snapshot( + params: &PushParameters, + namespace: &BackupNamespace, + snapshot: &BackupDir, + fetch_previous_manifest: bool, +) -> Result { + let mut stats = SyncStats::default(); + let target_ns = params.map_to_target(namespace)?; + let backup_dir = params + .source + .store + .backup_dir(namespace.clone(), snapshot.clone())?; + + // Reader locks the snapshot + let reader = params.source.reader(namespace, snapshot).await?; + + // Does not lock the manifest, but the reader already assures a locked snapshot + let source_manifest = match backup_dir.load_manifest() { + Ok((manifest, _raw_size)) => manifest, + Err(err) => { + // No manifest in snapshot or failed to read, warn and skip + log::warn!("failed to load manifest - {err}"); + return Ok(stats); + } + }; + + // Writer instance locks the snapshot on the remote side + let backup_writer = BackupWriter::start( + ¶ms.target.client, + None, + params.target.repo.store(), + &target_ns, + snapshot, + false, + false, + ) + .await?; + + let mut previous_manifest = None; + // Use manifest of previous snapshots in group on target for chunk upload deduplication + if fetch_previous_manifest { + match backup_writer.download_previous_manifest().await { + Ok(manifest) => previous_manifest = Some(Arc::new(manifest)), + Err(err) => log::info!("Could not download previous manifest - {err}"), + } + }; + + // Dummy upload options: the actual compression and/or encryption already happened while + // the chunks were generated during creation of the backup snapshot, therefore pre-existing + // chunks (already compressed and/or encrypted) can be pushed to the target. + // Further, these steps are skipped in the backup writer upload stream. + // + // Therefore, these values do not need to fit the values given in the manifest. + // The original manifest is uploaded in the end anyways. + // + // Compression is set to true so that the uploaded manifest will be compressed. + // Encrypt is set to assure that above files are not encrypted. + let upload_options = UploadOptions { + compress: true, + encrypt: false, + previous_manifest, + ..UploadOptions::default() + }; + + // Avoid double upload penalty by remembering already seen chunks + let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 1024))); + + for entry in source_manifest.files() { + let mut path = backup_dir.full_path(); + path.push(&entry.filename); + if path.try_exists()? { + match ArchiveType::from_path(&entry.filename)? { + ArchiveType::Blob => { + let file = std::fs::File::open(path.clone())?; + let backup_stats = backup_writer.upload_blob(file, &entry.filename).await?; + stats.add(SyncStats { + chunk_count: backup_stats.chunk_count as usize, + bytes: backup_stats.size as usize, + elapsed: backup_stats.duration, + removed: None, + }); + } + ArchiveType::DynamicIndex => { + if let Some(manifest) = upload_options.previous_manifest.as_ref() { + // Add known chunks, ignore errors since archive might not be present + let _res = backup_writer + .download_previous_dynamic_index( + &entry.filename, + manifest, + known_chunks.clone(), + ) + .await; + } + let index = DynamicIndexReader::open(&path)?; + let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode()); + let sync_stats = push_index( + &entry.filename, + index, + chunk_reader, + &backup_writer, + None, + known_chunks.clone(), + ) + .await?; + stats.add(sync_stats); + } + ArchiveType::FixedIndex => { + if let Some(manifest) = upload_options.previous_manifest.as_ref() { + // Add known chunks, ignore errors since archive might not be present + let _res = backup_writer + .download_previous_fixed_index( + &entry.filename, + manifest, + known_chunks.clone(), + ) + .await; + } + let index = FixedIndexReader::open(&path)?; + let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode()); + let size = index.index_bytes(); + let sync_stats = push_index( + &entry.filename, + index, + chunk_reader, + &backup_writer, + Some(size), + known_chunks.clone(), + ) + .await?; + stats.add(sync_stats); + } + } + } else { + warn!("{path:?} does not exist, skipped."); + } + } + + // Fetch client log from source and push to target + // this has to be handled individually since the log is never part of the manifest + let mut client_log_path = backup_dir.full_path(); + client_log_path.push(CLIENT_LOG_BLOB_NAME); + if client_log_path.is_file() { + backup_writer + .upload_blob_from_file( + &client_log_path, + CLIENT_LOG_BLOB_NAME, + upload_options.clone(), + ) + .await?; + } + + // Rewrite manifest for pushed snapshot, recreating manifest from source on target + let manifest_json = serde_json::to_value(source_manifest)?; + let manifest_string = serde_json::to_string_pretty(&manifest_json)?; + let backup_stats = backup_writer + .upload_blob_from_data( + manifest_string.into_bytes(), + MANIFEST_BLOB_NAME, + upload_options, + ) + .await?; + backup_writer.finish().await?; + + stats.add(SyncStats { + chunk_count: backup_stats.chunk_count as usize, + bytes: backup_stats.size as usize, + elapsed: backup_stats.duration, + removed: None, + }); + + Ok(stats) +} + +// Read fixed or dynamic index and push to target by uploading via the backup writer instance +// +// For fixed indexes, the size must be provided as given by the index reader. +#[allow(clippy::too_many_arguments)] +async fn push_index<'a>( + filename: &'a str, + index: impl IndexFile + Send + 'static, + chunk_reader: Arc, + backup_writer: &BackupWriter, + size: Option, + known_chunks: Arc>>, +) -> Result { + let (upload_channel_tx, upload_channel_rx) = mpsc::channel(20); + let mut chunk_infos = + stream::iter(0..index.index_count()).map(move |pos| index.chunk_info(pos).unwrap()); + + tokio::spawn(async move { + while let Some(chunk_info) = chunk_infos.next().await { + // Avoid reading known chunks, as they are not uploaded by the backup writer anyways + let needs_upload = { + // Need to limit the scope of the lock, otherwise the async block is not `Send` + let mut known_chunks = known_chunks.lock().unwrap(); + // Check if present and insert, chunk will be read and uploaded below if not present + known_chunks.insert(chunk_info.digest) + }; + + let merged_chunk_info = if needs_upload { + chunk_reader + .read_raw_chunk(&chunk_info.digest) + .await + .map(|chunk| { + MergedChunkInfo::New(ChunkInfo { + chunk, + digest: chunk_info.digest, + chunk_len: chunk_info.size(), + offset: chunk_info.range.start, + }) + }) + } else { + Ok(MergedChunkInfo::Known(vec![( + // Pass size instead of offset, will be replaced with offset by the backup + // writer + chunk_info.size(), + chunk_info.digest, + )])) + }; + let _ = upload_channel_tx.send(merged_chunk_info).await; + } + }); + + let merged_chunk_info_stream = ReceiverStream::new(upload_channel_rx).map_err(Error::from); + + let upload_options = UploadOptions { + compress: true, + encrypt: false, + fixed_size: size, + ..UploadOptions::default() + }; + + let upload_stats = backup_writer + .upload_index_chunk_info(filename, merged_chunk_info_stream, upload_options) + .await?; + + Ok(SyncStats { + chunk_count: upload_stats.chunk_count as usize, + bytes: upload_stats.size as usize, + elapsed: upload_stats.duration, + removed: None, + }) +}