diff --git a/src/api2/admin.rs b/src/api2/admin.rs index 189a2a11..b927ce1e 100644 --- a/src/api2/admin.rs +++ b/src/api2/admin.rs @@ -2,9 +2,11 @@ use proxmox::api::router::{Router, SubdirMap}; use proxmox::list_subdirs_api_method; pub mod datastore; +pub mod sync; const SUBDIRS: SubdirMap = &[ - ("datastore", &datastore::ROUTER) + ("datastore", &datastore::ROUTER), + ("sync", &sync::ROUTER) ]; pub const ROUTER: Router = Router::new() diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs new file mode 100644 index 00000000..f6c8a720 --- /dev/null +++ b/src/api2/admin/sync.rs @@ -0,0 +1,134 @@ +use anyhow::{Error}; +use serde_json::Value; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::collections::HashMap; + +use proxmox::api::{api, ApiMethod, Router, RpcEnvironment}; +use proxmox::api::router::SubdirMap; +use proxmox::{list_subdirs_api_method, sortable}; + +use crate::api2::types::*; +use crate::api2::pull::{get_pull_parameters}; +use crate::config::sync::{self, SyncJobStatus, SyncJobConfig}; +use crate::server::{self, TaskListInfo, WorkerTask}; +use crate::tools::systemd::time::{ + parse_calendar_event, compute_next_event}; + +#[api( + input: { + properties: {}, + }, + returns: { + description: "List configured jobs and their status.", + type: Array, + items: { type: sync::SyncJobStatus }, + }, +)] +/// List all sync jobs +pub fn list_sync_jobs( + _param: Value, + mut rpcenv: &mut dyn RpcEnvironment, +) -> Result, Error> { + + let (config, digest) = sync::config()?; + + let mut list: Vec = config.convert_to_typed_array("sync")?; + + let mut last_tasks: HashMap = HashMap::new(); + let tasks = server::read_task_list()?; + + for info in tasks.iter() { + let worker_id = match &info.upid.worker_id { + Some(id) => id, + _ => { continue; }, + }; + if let Some(last) = last_tasks.get(worker_id) { + if last.upid.starttime < info.upid.starttime { + last_tasks.insert(worker_id.to_string(), &info); + } + } else { + last_tasks.insert(worker_id.to_string(), &info); + } + } + + let now = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(epoch_now) => epoch_now.as_secs() as i64, + _ => 0i64, + }; + + for job in &mut list { + job.next_run = (|| -> Option { + let schedule = job.schedule.as_ref()?; + let event = parse_calendar_event(&schedule).ok()?; + compute_next_event(&event, now, false).ok() + })(); + + if let Some(task) = last_tasks.get(&job.id) { + job.last_run_upid = Some(task.upid_str.clone()); + if let Some((endttime, status)) = &task.state { + job.last_run_state = Some(String::from(status)); + job.last_run_endtime = Some(*endttime); + } + } + } + + rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into(); + + Ok(list) +} + +#[api( + input: { + properties: { + id: { + schema: JOB_ID_SCHEMA, + } + } + } +)] +/// Runs the sync jobs manually. +async fn run_sync_job( + id: String, + _info: &ApiMethod, + rpcenv: &mut dyn RpcEnvironment, +) -> Result { + + let (config, _digest) = sync::config()?; + let sync_job: SyncJobConfig = config.lookup("sync", &id)?; + + let username = rpcenv.get_user().unwrap(); + + let delete = sync_job.remove_vanished.unwrap_or(true); + let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?; + + let upid_str = WorkerTask::spawn("syncjob", Some(id.clone()), &username.clone(), false, move |worker| async move { + + worker.log(format!("sync job '{}' start", &id)); + + crate::client::pull::pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, String::from("backup@pam")).await?; + + worker.log(format!("sync job '{}' end", &id)); + + Ok(()) + })?; + + Ok(upid_str) +} + +#[sortable] +const SYNC_INFO_SUBDIRS: SubdirMap = &[ + ( + "run", + &Router::new() + .post(&API_METHOD_RUN_SYNC_JOB) + ), +]; + +const SYNC_INFO_ROUTER: Router = Router::new() + .get(&list_subdirs_api_method!(SYNC_INFO_SUBDIRS)) + .subdirs(SYNC_INFO_SUBDIRS); + + +pub const ROUTER: Router = Router::new() + .get(&API_METHOD_LIST_SYNC_JOBS) + .match_all("id", &SYNC_INFO_ROUTER);