diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-rest-server/src/command_socket.rs index 1d62d21d..511ad561 100644 --- a/proxmox-rest-server/src/command_socket.rs +++ b/proxmox-rest-server/src/command_socket.rs @@ -102,7 +102,7 @@ where Ok(task) } - +/// Send a command to the specified socket pub async fn send_command(path: P, params: &T) -> Result where P: AsRef, @@ -113,6 +113,7 @@ where send_raw_command(path.as_ref(), &command_string).await } +/// Send a raw command (string) to the specified socket pub async fn send_raw_command

(path: P, command_string: &str) -> Result where P: AsRef, @@ -146,11 +147,12 @@ where } } -/// A callback for a specific commando socket. -pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; +// A callback for a specific commando socket. +type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; -/// Tooling to get a single control command socket where one can register multiple commands -/// dynamically. +/// Tooling to get a single control command socket where one can +/// register multiple commands dynamically. +/// /// You need to call `spawn()` to make the socket active. pub struct CommandoSocket { socket: PathBuf, diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs index 5f1cefbb..4b8cce96 100644 --- a/proxmox-rest-server/src/lib.rs +++ b/proxmox-rest-server/src/lib.rs @@ -40,6 +40,7 @@ pub use worker_task::*; mod h2service; pub use h2service::*; +/// Authentification Error pub enum AuthError { Generic(Error), NoData, @@ -51,7 +52,12 @@ impl From for AuthError { } } +/// User Authentification trait pub trait ApiAuth { + /// Extract user credentials from headers and check them. + /// + /// If credenthials are valid, returns the username and a + /// [UserInformation] object to query additional user data. fn check_auth( &self, headers: &http::HeaderMap, @@ -64,47 +70,64 @@ lazy_static::lazy_static!{ static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; } +/// Retruns the current process ID (see [libc::getpid]) +/// +/// The value is cached at startup (so it is invalid after a fork) pub fn pid() -> i32 { *PID } +/// Returns the starttime of the process (see [PidStat]) +/// +/// The value is cached at startup (so it is invalid after a fork) pub fn pstart() -> u64 { *PSTART } +/// Helper to write the PID into a file pub fn write_pid(pid_fn: &str) -> Result<(), Error> { let pid_str = format!("{}\n", *PID); proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new()) } +/// Helper to read the PID from a file pub fn read_pid(pid_fn: &str) -> Result { let pid = proxmox::tools::fs::file_get_contents(pid_fn)?; let pid = std::str::from_utf8(&pid)?.trim(); pid.parse().map_err(|err| format_err!("could not parse pid - {}", err)) } +/// Returns the control socket path for a specific process ID. +/// +/// Note: The control socket always uses @/run/proxmox-backup/ as +/// prefix for historic reason. This does not matter because the +/// generated path is unique for each ``pid`` anyways. pub fn ctrl_sock_from_pid(pid: i32) -> String { // Note: The control socket always uses @/run/proxmox-backup/ as prefix // for historc reason. format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) } +/// Returns the control socket path for this server. pub fn our_ctrl_sock() -> String { ctrl_sock_from_pid(*PID) } static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); +/// Request a server shutdown (usually called from [catch_shutdown_signal]) pub fn request_shutdown() { SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst); crate::server_shutdown(); } +/// Returns true if there was a shutdown request. #[inline(always)] pub fn shutdown_requested() -> bool { SHUTDOWN_REQUESTED.load(Ordering::SeqCst) } +/// Raise an error if there was a shutdown request. pub fn fail_on_shutdown() -> Result<(), Error> { if shutdown_requested() { bail!("Server shutdown requested - aborting task"); diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs index e3a41e13..4585b266 100644 --- a/proxmox-rest-server/src/state.rs +++ b/proxmox-rest-server/src/state.rs @@ -8,6 +8,8 @@ use tokio::signal::unix::{signal, SignalKind}; use pbs_tools::broadcast_future::BroadcastData; +use crate::request_shutdown; + #[derive(PartialEq, Copy, Clone, Debug)] enum ServerMode { Normal, @@ -35,6 +37,8 @@ lazy_static! { } /// Listen to ``SIGINT`` for server shutdown +/// +/// This calls [request_shutdown] when receiving the signal. pub fn catch_shutdown_signal() -> Result<(), Error> { let mut stream = signal(SignalKind::interrupt())?; @@ -43,7 +47,7 @@ pub fn catch_shutdown_signal() -> Result<(), Error> { while stream.recv().await.is_some() { log::info!("got shutdown request (SIGINT)"); SERVER_STATE.lock().unwrap().reload_request = false; - crate::request_shutdown(); + request_shutdown(); } }.boxed(); @@ -56,6 +60,9 @@ pub fn catch_shutdown_signal() -> Result<(), Error> { } /// Listen to ``SIGHUP`` for server reload +/// +/// This calls [request_shutdown] when receiving the signal, and tries +/// to restart the server. pub fn catch_reload_signal() -> Result<(), Error> { let mut stream = signal(SignalKind::hangup())?; @@ -76,13 +83,14 @@ pub fn catch_reload_signal() -> Result<(), Error> { Ok(()) } -pub fn is_reload_request() -> bool { +pub(crate) fn is_reload_request() -> bool { let data = SERVER_STATE.lock().unwrap(); data.mode == ServerMode::Shutdown && data.reload_request } -pub fn server_shutdown() { + +pub(crate) fn server_shutdown() { let mut data = SERVER_STATE.lock().unwrap(); log::info!("request_shutdown"); @@ -96,6 +104,7 @@ pub fn server_shutdown() { check_last_worker(); } +/// Future to signal server shutdown pub fn shutdown_future() -> impl Future { let mut data = SERVER_STATE.lock().unwrap(); data @@ -104,18 +113,19 @@ pub fn shutdown_future() -> impl Future { .map(|_| ()) } +/// Future to signal when last worker task finished pub fn last_worker_future() -> impl Future> { let mut data = SERVER_STATE.lock().unwrap(); data.last_worker_listeners.listen() } -pub fn set_worker_count(count: usize) { +pub(crate) fn set_worker_count(count: usize) { SERVER_STATE.lock().unwrap().worker_count = count; check_last_worker(); } -pub fn check_last_worker() { +pub(crate) fn check_last_worker() { let mut data = SERVER_STATE.lock().unwrap(); if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; } diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs index b3b5d9e2..dea78b5a 100644 --- a/proxmox-rest-server/src/worker_task.rs +++ b/proxmox-rest-server/src/worker_task.rs @@ -324,6 +324,14 @@ pub fn worker_is_active_local(upid: &UPID) -> bool { } } +/// Register task control command on a [CommandoSocket]. +/// +/// This create two commands: +/// +/// * ``worker-task-abort ``: calls [abort_local_worker] +/// +/// * ``worker-task-status ``: return true of false, depending on +/// whether the worker is running or stopped. pub fn register_task_control_commands( commando_sock: &mut CommandoSocket, ) -> Result<(), Error> { @@ -358,14 +366,20 @@ pub fn register_task_control_commands( Ok(()) } -pub fn abort_worker_async(upid: UPID) { +/// Try to abort a worker task, but do no wait +/// +/// Errors (if any) are simply logged. +pub fn abort_worker_nowait(upid: UPID) { tokio::spawn(async move { if let Err(err) = abort_worker(upid).await { - eprintln!("abort worker failed - {}", err); + log::error!("abort worker task failed - {}", err); } }); } +/// Abort a worker task +/// +/// By sending ``worker-task-abort`` to the control socket. pub async fn abort_worker(upid: UPID) -> Result<(), Error> { let sock = crate::ctrl_sock_from_pid(upid.pid); @@ -513,7 +527,7 @@ fn read_task_file(reader: R) -> Result, Error> state }), Err(err) => { - eprintln!("unable to parse worker status '{}' - {}", line, err); + log::warn!("unable to parse worker status '{}' - {}", line, err); continue; } }; @@ -536,6 +550,7 @@ where read_task_file(file) } +/// Iterate over existing/active worker tasks pub struct TaskListInfoIterator { list: VecDeque, end: bool, @@ -544,6 +559,7 @@ pub struct TaskListInfoIterator { } impl TaskListInfoIterator { + /// Creates a new iterator instance. pub fn new(active_only: bool) -> Result { let setup = worker_task_setup()?; @@ -811,8 +827,6 @@ impl WorkerTask { /// Request abort pub fn request_abort(&self) { - eprintln!("set abort flag for worker {}", self.upid); - let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); if !prev_abort { // log abort one time self.log_message(format!("received abort request ..."));