diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml index b02c20db..afaf40e1 100644 --- a/proxmox-rest-server/Cargo.toml +++ b/proxmox-rest-server/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1.4" libc = "0.2" log = "0.4" nix = "0.19.1" +once_cell = "1.3.1" percent-encoding = "2.1" regex = "1.2" serde = { version = "1.0", features = [] } diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs index 2f29f9cd..9acdb3fd 100644 --- a/proxmox-rest-server/src/lib.rs +++ b/proxmox-rest-server/src/lib.rs @@ -1,9 +1,12 @@ use std::os::unix::io::RawFd; use anyhow::{bail, format_err, Error}; +use nix::unistd::Pid; use proxmox::tools::fd::Fd; +use proxmox::sys::linux::procfs::PidStat; use proxmox::api::UserInformation; +use proxmox::tools::fs::CreateOptions; mod compression; pub use compression::*; @@ -29,6 +32,9 @@ pub use api_config::ApiConfig; mod rest; pub use rest::{RestServer, handle_api_request}; +mod worker_task; +pub use worker_task::*; + pub enum AuthError { Generic(Error), NoData, @@ -48,6 +54,40 @@ pub trait ApiAuth { ) -> Result<(String, Box), AuthError>; } +lazy_static::lazy_static!{ + static ref PID: i32 = unsafe { libc::getpid() }; + static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; +} + +pub fn pid() -> i32 { + *PID +} + +pub fn pstart() -> u64 { + *PSTART +} + +pub fn write_pid(pid_fn: &str) -> Result<(), Error> { + let pid_str = format!("{}\n", *PID); + proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new()) +} + +pub fn read_pid(pid_fn: &str) -> Result { + let pid = proxmox::tools::fs::file_get_contents(pid_fn)?; + let pid = std::str::from_utf8(&pid)?.trim(); + pid.parse().map_err(|err| format_err!("could not parse pid - {}", err)) +} + +pub fn ctrl_sock_from_pid(pid: i32) -> String { + // Note: The control socket always uses @/run/proxmox-backup/ as prefix + // for historc reason. + format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) +} + +pub fn our_ctrl_sock() -> String { + ctrl_sock_from_pid(*PID) +} + static mut SHUTDOWN_REQUESTED: bool = false; pub fn request_shutdown() { diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs new file mode 100644 index 00000000..b6ed6862 --- /dev/null +++ b/proxmox-rest-server/src/worker_task.rs @@ -0,0 +1,903 @@ +use std::collections::{HashMap, VecDeque}; +use std::fs::File; +use std::path::PathBuf; +use std::io::{Read, Write, BufRead, BufReader}; +use std::panic::UnwindSafe; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use anyhow::{bail, format_err, Error}; +use futures::*; +use lazy_static::lazy_static; +use serde_json::{json, Value}; +use serde::{Serialize, Deserialize}; +use tokio::sync::oneshot; +use nix::fcntl::OFlag; +use once_cell::sync::OnceCell; + +use proxmox::sys::linux::procfs; +use proxmox::try_block; +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; +use proxmox::api::upid::UPID; + +use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; + +use crate::{CommandoSocket, FileLogger, FileLogOptions}; + +struct TaskListLockGuard(File); + +struct WorkerTaskSetup { + file_opts: CreateOptions, + taskdir: PathBuf, + task_lock_fn: PathBuf, + active_tasks_fn: PathBuf, + task_index_fn: PathBuf, + task_archive_fn: PathBuf, +} + +static WORKER_TASK_SETUP: OnceCell = OnceCell::new(); + +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { + WORKER_TASK_SETUP.get() + .ok_or_else(|| format_err!("WorkerTask library is not initialized")) +} + +impl WorkerTaskSetup { + + fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { + + let mut taskdir = basedir.clone(); + taskdir.push("tasks"); + + let mut task_lock_fn = taskdir.clone(); + task_lock_fn.push(".active.lock"); + + let mut active_tasks_fn = taskdir.clone(); + active_tasks_fn.push("active"); + + let mut task_index_fn = taskdir.clone(); + task_index_fn.push("index"); + + let mut task_archive_fn = taskdir.clone(); + task_archive_fn.push("archive"); + + Self { + file_opts, + taskdir, + task_lock_fn, + active_tasks_fn, + task_index_fn, + task_archive_fn, + } + } + + fn lock_task_list_files(&self, exclusive: bool) -> Result { + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let timeout = std::time::Duration::new(10, 0); + + let file = proxmox::tools::fs::open_file_locked( + &self.task_lock_fn, + timeout, + exclusive, + options, + )?; + + Ok(TaskListLockGuard(file)) + } + + fn log_path(&self, upid: &UPID) -> std::path::PathBuf { + let mut path = self.taskdir.clone(); + path.push(format!("{:02X}", upid.pstart % 256)); + path.push(upid.to_string()); + path + } + + // atomically read/update the task list, update status of finished tasks + // new_upid is added to the list when specified. + fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { + + let lock = self.lock_task_list_files(true)?; + + // TODO remove with 1.x + let mut finish_list: Vec = read_task_file_from_path(&self.task_index_fn)?; + let had_index_file = !finish_list.is_empty(); + + // We use filter_map because one negative case wants to *move* the data into `finish_list`, + // clippy doesn't quite catch this! + #[allow(clippy::unnecessary_filter_map)] + let mut active_list: Vec = read_task_file_from_path(&self.active_tasks_fn)? + .into_iter() + .filter_map(|info| { + if info.state.is_some() { + // this can happen when the active file still includes finished tasks + finish_list.push(info); + return None; + } + + if !worker_is_active_local(&info.upid) { + // println!("Detected stopped task '{}'", &info.upid_str); + let now = proxmox::tools::time::epoch_i64(); + let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); + finish_list.push(TaskListInfo { + upid: info.upid, + upid_str: info.upid_str, + state: Some(status) + }); + return None; + } + + Some(info) + }).collect(); + + if let Some(upid) = new_upid { + active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); + } + + let active_raw = render_task_list(&active_list); + + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + replace_file( + &self.active_tasks_fn, + active_raw.as_bytes(), + options, + )?; + + finish_list.sort_unstable_by(|a, b| { + match (&a.state, &b.state) { + (Some(s1), Some(s2)) => s1.cmp(&s2), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + _ => a.upid.starttime.cmp(&b.upid.starttime), + } + }); + + if !finish_list.is_empty() { + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let mut writer = atomic_open_or_create_file( + &self.task_archive_fn, + OFlag::O_APPEND | OFlag::O_RDWR, + &[], + options, + )?; + for info in &finish_list { + writer.write_all(render_task_line(&info).as_bytes())?; + } + } + + // TODO Remove with 1.x + // for compatibility, if we had an INDEX file, we do not need it anymore + if had_index_file { + let _ = nix::unistd::unlink(&self.task_index_fn); + } + + drop(lock); + + Ok(()) + } + + // Create task log directory with correct permissions + fn create_task_log_dirs(&self) -> Result<(), Error> { + + try_block!({ + let dir_opts = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); + + create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?; + // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; + Ok(()) + }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) + } +} + +/// Initialize the WorkerTask library +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { + let setup = WorkerTaskSetup::new(basedir, file_opts); + setup.create_task_log_dirs()?; + WORKER_TASK_SETUP.set(setup) + .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) +} + +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and +/// rotates it if it is +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option) -> Result { + + let setup = worker_task_setup()?; + + let _lock = setup.lock_task_list_files(true)?; + + let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress) + .ok_or_else(|| format_err!("could not get archive file names"))?; + + logrotate.rotate(size_threshold, None, max_files) +} + + +/// Path to the worker log file +pub fn upid_log_path(upid: &UPID) -> Result { + let setup = worker_task_setup()?; + Ok(setup.log_path(upid)) +} + +/// Read endtime (time of last log line) and exitstatus from task log file +/// If there is not a single line with at valid datetime, we assume the +/// starttime to be the endtime +pub fn upid_read_status(upid: &UPID) -> Result { + + let setup = worker_task_setup()?; + + let mut status = TaskState::Unknown { endtime: upid.starttime }; + + let path = setup.log_path(upid); + + let mut file = File::open(path)?; + + /// speedup - only read tail + use std::io::Seek; + use std::io::SeekFrom; + let _ = file.seek(SeekFrom::End(-8192)); // ignore errors + + let mut data = Vec::with_capacity(8192); + file.read_to_end(&mut data)?; + + // strip newlines at the end of the task logs + while data.last() == Some(&b'\n') { + data.pop(); + } + + let last_line = match data.iter().rposition(|c| *c == b'\n') { + Some(start) if data.len() > (start+1) => &data[start+1..], + Some(_) => &data, // should not happen, since we removed all trailing newlines + None => &data, + }; + + let last_line = std::str::from_utf8(last_line) + .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; + + let mut iter = last_line.splitn(2, ": "); + if let Some(time_str) = iter.next() { + if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { + // set the endtime even if we cannot parse the state + status = TaskState::Unknown { endtime }; + if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { + if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { + status = state; + } + } + } + } + + Ok(status) +} + +lazy_static! { + static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); +} + +/// checks if the task UPID refers to a worker from this process +fn is_local_worker(upid: &UPID) -> bool { + upid.pid == crate::pid() && upid.pstart == crate::pstart() +} + +/// Test if the task is still running +pub async fn worker_is_active(upid: &UPID) -> Result { + if is_local_worker(upid) { + return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); + } + + if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { + return Ok(false); + } + + let sock = crate::ctrl_sock_from_pid(upid.pid); + let cmd = json!({ + "command": "worker-task-status", + "args": { + "upid": upid.to_string(), + }, + }); + let status = crate::send_command(sock, &cmd).await?; + + if let Some(active) = status.as_bool() { + Ok(active) + } else { + bail!("got unexpected result {:?} (expected bool)", status); + } +} + +/// Test if the task is still running (fast but inaccurate implementation) +/// +/// If the task is spawned from a different process, we simply return if +/// that process is still running. This information is good enough to detect +/// stale tasks... +pub fn worker_is_active_local(upid: &UPID) -> bool { + if is_local_worker(upid) { + WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) + } else { + procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() + } +} + +pub fn register_task_control_commands( + commando_sock: &mut CommandoSocket, +) -> Result<(), Error> { + fn get_upid(args: Option<&Value>) -> Result { + let args = if let Some(args) = args { args } else { bail!("missing args") }; + let upid = match args.get("upid") { + Some(Value::String(upid)) => upid.parse::()?, + None => bail!("no upid in args"), + _ => bail!("unable to parse upid"), + }; + if !is_local_worker(&upid) { + bail!("upid does not belong to this process"); + } + Ok(upid) + } + + commando_sock.register_command("worker-task-abort".into(), move |args| { + let upid = get_upid(args)?; + + abort_local_worker(upid); + + Ok(Value::Null) + })?; + commando_sock.register_command("worker-task-status".into(), move |args| { + let upid = get_upid(args)?; + + let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); + + Ok(active.into()) + })?; + + Ok(()) +} + +pub fn abort_worker_async(upid: UPID) { + tokio::spawn(async move { + if let Err(err) = abort_worker(upid).await { + eprintln!("abort worker failed - {}", err); + } + }); +} + +pub async fn abort_worker(upid: UPID) -> Result<(), Error> { + + let sock = crate::ctrl_sock_from_pid(upid.pid); + let cmd = json!({ + "command": "worker-task-abort", + "args": { + "upid": upid.to_string(), + }, + }); + crate::send_command(sock, &cmd).map_ok(|_| ()).await +} + +fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { + + let data = line.splitn(3, ' ').collect::>(); + + let len = data.len(); + + match len { + 1 => Ok((data[0].to_owned(), data[0].parse::()?, None)), + 3 => { + let endtime = i64::from_str_radix(data[1], 16)?; + let state = TaskState::from_endtime_and_message(endtime, data[2])?; + Ok((data[0].to_owned(), data[0].parse::()?, Some(state))) + } + _ => bail!("wrong number of components"), + } +} + +/// Task State +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum TaskState { + /// The Task ended with an undefined state + Unknown { endtime: i64 }, + /// The Task ended and there were no errors or warnings + OK { endtime: i64 }, + /// The Task had 'count' amount of warnings and no errors + Warning { count: u64, endtime: i64 }, + /// The Task ended with the error described in 'message' + Error { message: String, endtime: i64 }, +} + +impl TaskState { + pub fn endtime(&self) -> i64 { + match *self { + TaskState::Unknown { endtime } => endtime, + TaskState::OK { endtime } => endtime, + TaskState::Warning { endtime, .. } => endtime, + TaskState::Error { endtime, .. } => endtime, + } + } + + fn result_text(&self) -> String { + match self { + TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), + other => format!("TASK {}", other), + } + } + + fn from_endtime_and_message(endtime: i64, s: &str) -> Result { + if s == "unknown" { + Ok(TaskState::Unknown { endtime }) + } else if s == "OK" { + Ok(TaskState::OK { endtime }) + } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { + let count: u64 = warnings.parse()?; + Ok(TaskState::Warning{ count, endtime }) + } else if !s.is_empty() { + let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); + Ok(TaskState::Error{ message, endtime }) + } else { + bail!("unable to parse Task Status '{}'", s); + } + } +} + +impl std::cmp::PartialOrd for TaskState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.endtime().cmp(&other.endtime())) + } +} + +impl std::cmp::Ord for TaskState { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.endtime().cmp(&other.endtime()) + } +} + +impl std::fmt::Display for TaskState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskState::Unknown { .. } => write!(f, "unknown"), + TaskState::OK { .. }=> write!(f, "OK"), + TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), + TaskState::Error { message, .. } => write!(f, "{}", message), + } + } +} + +/// Task details including parsed UPID +/// +/// If there is no `state`, the task is still running. +#[derive(Debug)] +pub struct TaskListInfo { + /// The parsed UPID + pub upid: UPID, + /// UPID string representation + pub upid_str: String, + /// Task `(endtime, status)` if already finished + pub state: Option, // endtime, status +} + +fn render_task_line(info: &TaskListInfo) -> String { + let mut raw = String::new(); + if let Some(status) = &info.state { + raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); + } else { + raw.push_str(&info.upid_str); + raw.push('\n'); + } + + raw +} + +fn render_task_list(list: &[TaskListInfo]) -> String { + let mut raw = String::new(); + for info in list { + raw.push_str(&render_task_line(&info)); + } + raw +} + +// note this is not locked, caller has to make sure it is +// this will skip (and log) lines that are not valid status lines +fn read_task_file(reader: R) -> Result, Error> +{ + let reader = BufReader::new(reader); + let mut list = Vec::new(); + for line in reader.lines() { + let line = line?; + match parse_worker_status_line(&line) { + Ok((upid_str, upid, state)) => list.push(TaskListInfo { + upid_str, + upid, + state + }), + Err(err) => { + eprintln!("unable to parse worker status '{}' - {}", line, err); + continue; + } + }; + } + + Ok(list) +} + +// note this is not locked, caller has to make sure it is +fn read_task_file_from_path

(path: P) -> Result, Error> +where + P: AsRef + std::fmt::Debug, +{ + let file = match File::open(&path) { + Ok(f) => f, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), + Err(err) => bail!("unable to open task list {:?} - {}", path, err), + }; + + read_task_file(file) +} + +pub struct TaskListInfoIterator { + list: VecDeque, + end: bool, + archive: Option, + lock: Option, +} + +impl TaskListInfoIterator { + pub fn new(active_only: bool) -> Result { + + let setup = worker_task_setup()?; + + let (read_lock, active_list) = { + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; + + let needs_update = active_list + .iter() + .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); + + // TODO remove with 1.x + let index_exists = setup.task_index_fn.is_file(); + + if needs_update || index_exists { + drop(lock); + setup.update_active_workers(None)?; + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; + (lock, active_list) + } else { + (lock, active_list) + } + }; + + let archive = if active_only { + None + } else { + let logrotate = LogRotate::new(&setup.task_archive_fn, true) + .ok_or_else(|| format_err!("could not get archive file names"))?; + Some(logrotate.files()) + }; + + let lock = if active_only { None } else { Some(read_lock) }; + + Ok(Self { + list: active_list.into(), + end: active_only, + archive, + lock, + }) + } +} + +impl Iterator for TaskListInfoIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + if let Some(element) = self.list.pop_back() { + return Some(Ok(element)); + } else if self.end { + return None; + } else { + if let Some(mut archive) = self.archive.take() { + if let Some(file) = archive.next() { + let list = match read_task_file(file) { + Ok(list) => list, + Err(err) => return Some(Err(err)), + }; + self.list.append(&mut list.into()); + self.archive = Some(archive); + continue; + } + } + + self.end = true; + self.lock.take(); + } + } + } +} + +/// Launch long running worker tasks. +/// +/// A worker task can either be a whole thread, or a simply tokio +/// task/future. Each task can `log()` messages, which are stored +/// persistently to files. Task should poll the `abort_requested` +/// flag, and stop execution when requested. +pub struct WorkerTask { + setup: &'static WorkerTaskSetup, + upid: UPID, + data: Mutex, + abort_requested: AtomicBool, +} + +impl std::fmt::Display for WorkerTask { + + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.upid.fmt(f) + } +} + +struct WorkerTaskData { + logger: FileLogger, + progress: f64, // 0..1 + warn_count: u64, + pub abort_listeners: Vec>, +} + +impl WorkerTask { + + pub fn new( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + ) -> Result, Error> { + + let setup = worker_task_setup()?; + + let upid = UPID::new(worker_type, worker_id, auth_id)?; + let task_id = upid.task_id; + + let mut path = setup.taskdir.clone(); + + path.push(format!("{:02X}", upid.pstart & 255)); + + let dir_opts = setup.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); + + create_path(&path, None, Some(dir_opts))?; + + path.push(upid.to_string()); + + let logger_options = FileLogOptions { + to_stdout, + exclusive: true, + prefix_time: true, + read: true, + file_opts: setup.file_opts.clone(), + ..Default::default() + }; + let logger = FileLogger::new(&path, logger_options)?; + + let worker = Arc::new(Self { + setup, + upid: upid.clone(), + abort_requested: AtomicBool::new(false), + data: Mutex::new(WorkerTaskData { + logger, + progress: 0.0, + warn_count: 0, + abort_listeners: vec![], + }), + }); + + // scope to drop the lock again after inserting + { + let mut hash = WORKER_TASK_LIST.lock().unwrap(); + hash.insert(task_id, worker.clone()); + crate::set_worker_count(hash.len()); + } + + setup.update_active_workers(Some(&upid))?; + + Ok(worker) + } + + /// Spawn a new tokio task/future. + pub fn spawn( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + f: F, + ) -> Result + where F: Send + 'static + FnOnce(Arc) -> T, + T: Send + 'static + Future>, + { + let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; + let upid_str = worker.upid.to_string(); + let f = f(worker.clone()); + tokio::spawn(async move { + let result = f.await; + worker.log_result(&result); + }); + + Ok(upid_str) + } + + /// Create a new worker thread. + pub fn new_thread( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + f: F, + ) -> Result + where F: Send + UnwindSafe + 'static + FnOnce(Arc) -> Result<(), Error> + { + let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; + let upid_str = worker.upid.to_string(); + + let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { + let worker1 = worker.clone(); + let result = match std::panic::catch_unwind(move || f(worker1)) { + Ok(r) => r, + Err(panic) => { + match panic.downcast::<&str>() { + Ok(panic_msg) => { + Err(format_err!("worker panicked: {}", panic_msg)) + } + Err(_) => { + Err(format_err!("worker panicked: unknown type.")) + } + } + } + }; + + worker.log_result(&result); + }); + + Ok(upid_str) + } + + /// create state from self and a result + pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { + let warn_count = self.data.lock().unwrap().warn_count; + + let endtime = proxmox::tools::time::epoch_i64(); + + if let Err(err) = result { + TaskState::Error { message: err.to_string(), endtime } + } else if warn_count > 0 { + TaskState::Warning { count: warn_count, endtime } + } else { + TaskState::OK { endtime } + } + } + + /// Log task result, remove task from running list + pub fn log_result(&self, result: &Result<(), Error>) { + let state = self.create_state(result); + self.log(state.result_text()); + + WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); + let _ = self.setup.update_active_workers(None); + crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); + } + + /// Log a message. + pub fn log>(&self, msg: S) { + let mut data = self.data.lock().unwrap(); + data.logger.log(msg); + } + + /// Log a message as warning. + pub fn warn>(&self, msg: S) { + let mut data = self.data.lock().unwrap(); + data.logger.log(format!("WARN: {}", msg.as_ref())); + data.warn_count += 1; + } + + /// Set progress indicator + pub fn progress(&self, progress: f64) { + if progress >= 0.0 && progress <= 1.0 { + let mut data = self.data.lock().unwrap(); + data.progress = progress; + } else { + // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); + } + } + + /// Request abort + pub fn request_abort(&self) { + eprintln!("set abort flag for worker {}", self.upid); + + let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); + if !prev_abort { // log abort one time + self.log(format!("received abort request ...")); + } + // noitify listeners + let mut data = self.data.lock().unwrap(); + loop { + match data.abort_listeners.pop() { + None => { break; }, + Some(ch) => { + let _ = ch.send(()); // ignore errors here + }, + } + } + } + + /// Test if abort was requested. + pub fn abort_requested(&self) -> bool { + self.abort_requested.load(Ordering::SeqCst) + } + + /// Fail if abort was requested. + pub fn fail_on_abort(&self) -> Result<(), Error> { + if self.abort_requested() { + bail!("abort requested - aborting task"); + } + Ok(()) + } + + /// Get a future which resolves on task abort + pub fn abort_future(&self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + + let mut data = self.data.lock().unwrap(); + if self.abort_requested() { + let _ = tx.send(()); + } else { + data.abort_listeners.push(tx); + } + rx + } + + pub fn upid(&self) -> &UPID { + &self.upid + } +} + +impl pbs_tools::task::TaskState for WorkerTask { + fn check_abort(&self) -> Result<(), Error> { + self.fail_on_abort() + } + + fn log(&self, level: log::Level, message: &std::fmt::Arguments) { + match level { + log::Level::Error => self.warn(&message.to_string()), + log::Level::Warn => self.warn(&message.to_string()), + log::Level::Info => self.log(&message.to_string()), + log::Level::Debug => self.log(&format!("DEBUG: {}", message)), + log::Level::Trace => self.log(&format!("TRACE: {}", message)), + } + } +} + +/// Wait for a locally spanned worker task +/// +/// Note: local workers should print logs to stdout, so there is no +/// need to fetch/display logs. We just wait for the worker to finish. +pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { + + let upid: UPID = upid_str.parse()?; + + let sleep_duration = core::time::Duration::new(0, 100_000_000); + + loop { + if worker_is_active_local(&upid) { + tokio::time::sleep(sleep_duration).await; + } else { + break; + } + } + Ok(()) +} + +/// Request abort of a local worker (if existing and running) +pub fn abort_local_worker(upid: UPID) { + if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { + worker.request_abort(); + } +} diff --git a/src/server/h2service.rs b/src/server/h2service.rs index 41d628be..0b51a710 100644 --- a/src/server/h2service.rs +++ b/src/server/h2service.rs @@ -11,11 +11,9 @@ use hyper::{Body, Request, Response, StatusCode}; use proxmox::api::{ApiResponseFuture, HttpError, Router, RpcEnvironment}; use proxmox::http_err; -use proxmox_rest_server::normalize_uri_path; +use proxmox_rest_server::{normalize_uri_path, WorkerTask}; use proxmox_rest_server::formatter::*; -use crate::server::WorkerTask; - /// Hyper Service implementation to handle stateful H2 connections. /// /// We use this kind of service to handle backup protocol