From 076f36ec4e5efbfef8288cdab88d84ee3e789f79 Mon Sep 17 00:00:00 2001 From: Hannes Laimer Date: Tue, 21 Nov 2023 15:31:52 +0100 Subject: [PATCH] pull: add support for pulling from local datastore Signed-off-by: Hannes Laimer Reviewed-by: Lukas Wagner Tested-by: Lukas Wagner Tested-by: Tested-by: Gabriel Goller --- src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 138 insertions(+), 5 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index ff3e6d0a..1403c7a7 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,8 +1,8 @@ //! Sync datastore from remote server use std::collections::{HashMap, HashSet}; -use std::io::Seek; -use std::path::Path; +use std::io::{Seek, Write}; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -29,10 +29,12 @@ use pbs_datastore::manifest::{ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; use pbs_datastore::read_chunk::AsyncReadChunk; -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; +use pbs_datastore::{ + check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, +}; use pbs_tools::sha::sha256; -use crate::backup::{check_ns_modification_privs, check_ns_privs}; +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; use crate::tools::parallel_handler::ParallelHandler; struct RemoteReader { @@ -40,6 +42,12 @@ struct RemoteReader { dir: BackupDir, } +struct LocalReader { + _dir_lock: Arc>, + path: PathBuf, + datastore: Arc, +} + pub(crate) struct PullTarget { store: Arc, ns: BackupNamespace, @@ -51,6 +59,11 @@ pub(crate) struct RemoteSource { client: HttpClient, } +pub(crate) struct LocalSource { + store: Arc, + ns: BackupNamespace, +} + #[async_trait::async_trait] /// `PullSource` is a trait that provides an interface for pulling data/information from a source. /// The trait includes methods for listing namespaces, groups, and backup directories, @@ -234,6 +247,81 @@ impl PullSource for RemoteSource { } } +#[async_trait::async_trait] +impl PullSource for LocalSource { + async fn list_namespaces( + &self, + max_depth: &mut Option, + _worker: &WorkerTask, + ) -> Result, Error> { + ListNamespacesRecursive::new_max_depth( + self.store.clone(), + self.ns.clone(), + max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), + )? + .collect() + } + + async fn list_groups( + &self, + namespace: &BackupNamespace, + owner: &Authid, + ) -> Result, Error> { + Ok(ListAccessibleBackupGroups::new_with_privs( + &self.store, + namespace.clone(), + 0, + None, + None, + Some(owner), + )? + .filter_map(Result::ok) + .map(|backup_group| backup_group.group().clone()) + .collect::>()) + } + + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &BackupGroup, + _worker: &WorkerTask, + ) -> Result, Error> { + Ok(self + .store + .backup_group(namespace.clone(), group.clone()) + .iter_snapshots()? + .filter_map(Result::ok) + .map(|snapshot| snapshot.dir().to_owned()) + .collect::>()) + } + + fn get_ns(&self) -> BackupNamespace { + self.ns.clone() + } + + fn print_store_and_ns(&self) -> String { + print_store_and_ns(self.store.name(), &self.ns) + } + + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error> { + let dir = self.store.backup_dir(ns.clone(), dir.clone())?; + let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( + &dir.full_path(), + "snapshot", + "locked by another operation", + )?; + Ok(Arc::new(LocalReader { + _dir_lock: Arc::new(Mutex::new(dir_lock)), + path: dir.full_path(), + datastore: dir.datastore().clone(), + })) + } +} + #[async_trait::async_trait] /// `PullReader` is a trait that provides an interface for reading data from a source. /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. @@ -343,6 +431,48 @@ impl PullReader for RemoteReader { } } +#[async_trait::async_trait] +impl PullReader for LocalReader { + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { + Arc::new(LocalChunkReader::new( + self.datastore.clone(), + None, + crypt_mode, + )) + } + + async fn load_file_into( + &self, + filename: &str, + into: &Path, + _worker: &WorkerTask, + ) -> Result, Error> { + let mut tmp_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(into)?; + let mut from_path = self.path.clone(); + from_path.push(filename); + tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; + tmp_file.rewind()?; + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) + } + + async fn try_download_client_log( + &self, + _to_path: &Path, + _worker: &WorkerTask, + ) -> Result<(), Error> { + Ok(()) + } + + fn skip_chunk_sync(&self, target_store_name: &str) -> bool { + self.datastore.name() == target_store_name + } +} + /// Parameters for a pull operation. pub(crate) struct PullParameters { /// Where data is pulled from @@ -399,7 +529,10 @@ impl PullParameters { client, }) } else { - bail!("local sync not implemented yet") + Arc::new(LocalSource { + store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?, + ns: remote_ns, + }) }; let target = PullTarget { store: DataStore::lookup_datastore(store, Some(Operation::Write))?,