diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 2e56a435..7b1b547f 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -175,6 +175,7 @@ async fn schedule_tasks() -> Result<(), Error> { schedule_datastore_garbage_collection().await; schedule_datastore_prune().await; + schedule_datastore_sync_jobs().await; Ok(()) } @@ -299,7 +300,7 @@ async fn schedule_datastore_garbage_collection() { if let Err(err) = WorkerTask::new_thread( worker_type, Some(store.clone()), - "root@pam", + "backup@pam", false, move |worker| { worker.log(format!("starting garbage collection on store {}", store)); @@ -333,7 +334,7 @@ async fn schedule_datastore_prune() { let datastore = match DataStore::lookup_datastore(&store) { Ok(datastore) => datastore, Err(err) => { - eprintln!("lookup_datastore failed - {}", err); + eprintln!("lookup_datastore '{}' failed - {}", store, err); continue; } }; @@ -341,7 +342,7 @@ async fn schedule_datastore_prune() { let store_config: DataStoreConfig = match serde_json::from_value(store_config) { Ok(c) => c, Err(err) => { - eprintln!("datastore config from_value failed - {}", err); + eprintln!("datastore '{}' config from_value failed - {}", store, err); continue; } }; @@ -407,10 +408,11 @@ async fn schedule_datastore_prune() { if let Err(err) = WorkerTask::new_thread( worker_type, Some(store.clone()), - "root@pam", + "backup@pam", false, move |worker| { worker.log(format!("Starting datastore prune on store \"{}\"", store)); + worker.log(format!("task triggered by schedule '{}'", event_str)); worker.log(format!("retention options: {}", prune_options.cli_options_string())); let base_path = datastore.base_path(); @@ -444,3 +446,136 @@ async fn schedule_datastore_prune() { } } } + +async fn schedule_datastore_sync_jobs() { + + use proxmox_backup::{ + backup::DataStore, + client::{ HttpClient, HttpClientOptions, BackupRepository }, + server::{ WorkerTask }, + config::{ sync::{self, SyncJobConfig}, remote::{self, Remote} }, + tools::systemd::time::{ parse_calendar_event, compute_next_event }, + }; + + let config = match sync::config() { + Err(err) => { + eprintln!("unable to read sync job config - {}", err); + return; + } + Ok((config, _digest)) => config, + }; + + let remote_config = match remote::config() { + Err(err) => { + eprintln!("unable to read remote config - {}", err); + return; + } + Ok((config, _digest)) => config, + }; + + for (job_id, (_, job_config)) in config.sections { + let job_config: SyncJobConfig = match serde_json::from_value(job_config) { + Ok(c) => c, + Err(err) => { + eprintln!("sync job config from_value failed - {}", err); + continue; + } + }; + + let event_str = match job_config.schedule { + Some(ref event_str) => event_str.clone(), + None => continue, + }; + + let event = match parse_calendar_event(&event_str) { + Ok(event) => event, + Err(err) => { + eprintln!("unable to parse schedule '{}' - {}", event_str, err); + continue; + } + }; + + //fixme: if last_sync_job_still_running { continue; } + + let worker_type = "sync"; + + let last = match lookup_last_worker(worker_type, &job_config.store) { + Ok(Some(upid)) => upid.starttime, + Ok(None) => 0, + Err(err) => { + eprintln!("lookup_last_job_start failed: {}", err); + continue; + } + }; + + let next = match compute_next_event(&event, last, false) { + Ok(next) => next, + Err(err) => { + eprintln!("compute_next_event for '{}' failed - {}", event_str, err); + continue; + } + }; + + let now = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(epoch_now) => epoch_now.as_secs() as i64, + Err(err) => { + eprintln!("query system time failed - {}", err); + continue; + } + }; + if next > now { continue; } + + + let job_id2 = job_id.clone(); + + let tgt_store = match DataStore::lookup_datastore(&job_config.store) { + Ok(datastore) => datastore, + Err(err) => { + eprintln!("lookup_datastore '{}' failed - {}", job_config.store, err); + continue; + } + }; + + let remote: Remote = match remote_config.lookup("remote", &job_config.remote) { + Ok(remote) => remote, + Err(err) => { + eprintln!("remote_config lookup failed: {}", err); + continue; + } + }; + + let username = String::from("backup@pam"); + + let delete = job_config.remove_vanished.unwrap_or(true); + + if let Err(err) = WorkerTask::spawn( + worker_type, + Some(job_config.store.clone()), + &username.clone(), + false, + move |worker| async move { + worker.log(format!("Starting datastore sync job '{}'", job_id)); + worker.log(format!("task triggered by schedule '{}'", event_str)); + worker.log(format!("Sync datastore '{}' from '{}/{}'", + job_config.store, job_config.remote, job_config.remote_store)); + + let options = HttpClientOptions::new() + .password(Some(remote.password.clone())) + .fingerprint(remote.fingerprint.clone()); + + let client = HttpClient::new(&remote.host, &remote.userid, options)?; + let _auth_info = client.login() // make sure we can auth + .await + .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?; + + let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), job_config.remote_store); + + proxmox_backup::api2::pull::pull_store(&worker, &client, &src_repo, tgt_store, delete, username).await?; + + Ok(()) + } + ) { + eprintln!("unable to start datastore sync job {} - {}", job_id2, err); + } + } +}