diff --git a/Cargo.toml b/Cargo.toml index d396d46b..71c5b824 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "proxmox-client", "proxmox-compression", "proxmox-config-digest", + "proxmox-daemon", "proxmox-dns-api", "proxmox-http", "proxmox-http-error", @@ -123,6 +124,7 @@ proxmox-apt-api-types = { version = "1.0.1", path = "proxmox-apt-api-types" } proxmox-auth-api = { version = "0.4.0", path = "proxmox-auth-api" } proxmox-async = { version = "0.4.1", path = "proxmox-async" } proxmox-compression = { version = "0.2.3", path = "proxmox-compression" } +proxmox-daemon = { version = "0.1.0", path = "proxmox-daemon" } proxmox-http = { version = "0.9.2", path = "proxmox-http" } proxmox-http-error = { version = "0.1.0", path = "proxmox-http-error" } proxmox-human-byte = { version = "0.1.0", path = "proxmox-human-byte" } diff --git a/proxmox-daemon/Cargo.toml b/proxmox-daemon/Cargo.toml new file mode 100644 index 00000000..13bb3bc8 --- /dev/null +++ b/proxmox-daemon/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "proxmox-daemon" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +description = """ +Daemon state handling (catching reload signals, registering commands with the command socket, ...) +""" + +exclude.workspace = true + +[dependencies] +anyhow.workspace = true +futures.workspace = true +libc.workspace = true +log.workspace = true +nix.workspace = true +once_cell.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["io-util", "net", "rt", "signal", "sync"] } + +proxmox-sys.workspace = true +proxmox-systemd.workspace = true diff --git a/proxmox-daemon/debian/changelog b/proxmox-daemon/debian/changelog new file mode 100644 index 00000000..84236a11 --- /dev/null +++ b/proxmox-daemon/debian/changelog @@ -0,0 +1,5 @@ +rust-proxmox-daemon (0.1.0-1) bookworm; urgency=medium + + * initial split out of proxmox-rest-server + + -- Proxmox Support Team Tue, 23 Jul 2024 13:15:03 +0200 diff --git a/proxmox-daemon/debian/copyright b/proxmox-daemon/debian/copyright new file mode 100644 index 00000000..869939c3 --- /dev/null +++ b/proxmox-daemon/debian/copyright @@ -0,0 +1,18 @@ +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ + +Files: + * +Copyright: 2024 Proxmox Server Solutions GmbH +License: AGPL-3.0-or-later + This program is free software: you can redistribute it and/or modify it under + the terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or (at your option) any + later version. + . + This program is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more + details. + . + You should have received a copy of the GNU Affero General Public License along + with this program. If not, see . diff --git a/proxmox-daemon/debian/debcargo.toml b/proxmox-daemon/debian/debcargo.toml new file mode 100644 index 00000000..b7864cdb --- /dev/null +++ b/proxmox-daemon/debian/debcargo.toml @@ -0,0 +1,7 @@ +overlay = "." +crate_src_path = ".." +maintainer = "Proxmox Support Team " + +[source] +vcs_git = "git://git.proxmox.com/git/proxmox.git" +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-daemon/src/command_socket.rs similarity index 56% rename from proxmox-rest-server/src/command_socket.rs rename to proxmox-daemon/src/command_socket.rs index bfa42b01..b268bded 100644 --- a/proxmox-rest-server/src/command_socket.rs +++ b/proxmox-daemon/src/command_socket.rs @@ -1,26 +1,46 @@ use anyhow::{bail, format_err, Error}; use std::collections::HashMap; +use std::future::Future; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; +use std::pin::pin; use std::sync::Arc; -use futures::*; use nix::sys::socket; use nix::unistd::Gid; use serde::Serialize; use serde_json::Value; use tokio::net::UnixListener; +use tokio::sync::watch; + +/// Returns the control socket path for a specific process ID. +/// +/// Note: The control socket always uses @/run/proxmox-backup/ as +/// prefix for historic reason. This does not matter because the +/// generated path is unique for each ``pid`` anyways. +pub fn path_from_pid(pid: i32) -> String { + // Note: The control socket always uses @/run/proxmox-backup/ as prefix + // for historc reason. + format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) +} + +/// Returns the control socket path for this server. +pub fn this_path() -> String { + path_from_pid(unsafe { libc::getpid() }) +} // Listens on a Unix Socket to handle simple command asynchronously -fn create_control_socket( +fn create_control_socket( path: P, gid: Gid, + abort_future: W, func: F, ) -> Result, Error> where P: Into, F: Fn(Value) -> Result + Send + Sync + 'static, + W: Future + Send + 'static, { let path: PathBuf = path.into(); @@ -30,8 +50,24 @@ where let func = Arc::new(func); - let control_future = async move { + let (abort_sender, abort_receiver) = watch::channel(false); + + tokio::spawn(async move { + abort_future.await; + let _ = abort_sender.send(true); + }); + + let abort_future = { + let abort_receiver = abort_receiver.clone(); + async move { + let _ = { abort_receiver }.wait_for(|&v| v).await; + } + }; + + let control_future = Box::pin(async move { loop { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + let (conn, _addr) = match socket.accept().await { Ok(data) => data, Err(err) => { @@ -40,7 +76,7 @@ where } }; - let opt = socket::sockopt::PeerCredentials {}; + let opt = socket::sockopt::PeerCredentials; let cred = match socket::getsockopt(conn.as_raw_fd(), opt) { Ok(cred) => cred, Err(err) => { @@ -50,96 +86,90 @@ where }; // check permissions (same gid, root user, or backup group) - let mygid = unsafe { libc::getgid() }; - if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) { + let mygid = Gid::current(); + if !(cred.uid() == 0 || cred.gid() == mygid.as_raw() || cred.gid() == gid) { log::error!("no permissions for {:?}", cred); continue; } let (rx, mut tx) = tokio::io::split(conn); - let abort_future = super::last_worker_future().map(|_| ()); + let abort_future = { + let abort_receiver = abort_receiver.clone(); + Box::pin(async move { + let _ = { abort_receiver }.wait_for(|&v| v).await; + }) + }; - use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; let func = Arc::clone(&func); let path = path.clone(); - tokio::spawn( - futures::future::select( - async move { - let mut rx = tokio::io::BufReader::new(rx); - let mut line = String::new(); - loop { - line.clear(); - match rx - .read_line({ - line.clear(); - &mut line - }) - .await - { - Ok(0) => break, - Ok(_) => (), - Err(err) => { - log::error!("control socket {:?} read error: {}", path, err); - return; - } - } - - let response = match line.parse::() { - Ok(param) => match func(param) { - Ok(res) => format!("OK: {}\n", res), - Err(err) => format!("ERROR: {}\n", err), - }, - Err(err) => format!("ERROR: {}\n", err), - }; - - if let Err(err) = tx.write_all(response.as_bytes()).await { - log::error!( - "control socket {:?} write response error: {}", - path, - err - ); + tokio::spawn(futures::future::select( + Box::pin(async move { + let mut rx = tokio::io::BufReader::new(rx); + let mut line = String::new(); + loop { + line.clear(); + match rx + .read_line({ + line.clear(); + &mut line + }) + .await + { + Ok(0) => break, + Ok(_) => (), + Err(err) => { + log::error!("control socket {:?} read error: {}", path, err); return; } } + + let response = match line.parse::() { + Ok(param) => match func(param) { + Ok(res) => format!("OK: {}\n", res), + Err(err) => format!("ERROR: {}\n", err), + }, + Err(err) => format!("ERROR: {}\n", err), + }; + + if let Err(err) = tx.write_all(response.as_bytes()).await { + log::error!("control socket {:?} write response error: {}", path, err); + return; + } } - .boxed(), - abort_future, - ) - .map(|_| ()), - ); + }), + abort_future, + )); } - } - .boxed(); + }); - let abort_future = crate::last_worker_future().map_err(|_| {}); - let task = futures::future::select(control_future, abort_future) - .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); - - Ok(task) + Ok(async move { + let abort_future = pin!(abort_future); + futures::future::select(control_future, abort_future).await; + }) } /// Send a command to the specified socket -pub async fn send_command(path: P, params: &T) -> Result +pub async fn send(path: P, params: &T) -> Result where P: AsRef, T: ?Sized + Serialize, { let mut command_string = serde_json::to_string(params)?; command_string.push('\n'); - send_raw_command(path.as_ref(), &command_string).await + send_raw(path.as_ref(), &command_string).await } /// Send a raw command (string) to the specified socket -pub async fn send_raw_command

(path: P, command_string: &str) -> Result +pub async fn send_raw

(path: P, command_string: &str) -> Result where P: AsRef, { use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; let mut conn = tokio::net::UnixStream::connect(path) - .map_err(move |err| format_err!("control socket connect failed - {}", err)) - .await?; + .await + .map_err(move |err| format_err!("control socket connect failed - {}", err))?; conn.write_all(command_string.as_bytes()).await?; if !command_string.as_bytes().ends_with(b"\n") { @@ -164,7 +194,7 @@ where } } -// A callback for a specific commando socket. +// A callback for a specific command socket. type CommandSocketFn = Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; @@ -181,12 +211,9 @@ pub struct CommandSocket { impl CommandSocket { /// Creates a new instance. - pub fn new

(path: P, gid: Gid) -> Self - where - P: Into, - { + pub fn new(gid: Gid) -> Self { CommandSocket { - socket: path.into(), + socket: this_path().into(), gid, commands: HashMap::new(), } @@ -194,9 +221,18 @@ impl CommandSocket { /// Spawn the socket and consume self, meaning you cannot register commands anymore after /// calling this. - pub fn spawn(self) -> Result<(), Error> { - let control_future = - create_control_socket(self.socket.to_owned(), self.gid, move |param| { + /// + /// The `abort_future` is typically a `last_worker_future()` and is there because this + /// `spawn()`s a task which would otherwise never finish. + pub fn spawn(self, abort_future: F) -> Result<(), Error> + where + F: Future + Send + 'static, + { + let control_future = create_control_socket( + self.socket.to_owned(), + self.gid, + abort_future, + move |param| { let param = param.as_object().ok_or_else(|| { format_err!("unable to parse parameters (expected json object)") })?; @@ -218,7 +254,8 @@ impl CommandSocket { (handler)(args) } } - })?; + }, + )?; tokio::spawn(control_future); diff --git a/proxmox-daemon/src/lib.rs b/proxmox-daemon/src/lib.rs new file mode 100644 index 00000000..36e65eeb --- /dev/null +++ b/proxmox-daemon/src/lib.rs @@ -0,0 +1,12 @@ +//! Daemon and related state handling. + +pub mod command_socket; + +mod state; +pub use state::fail_on_shutdown; +pub use state::shutdown_future; +pub use state::{catch_reload_signal, reload_signal_task}; +pub use state::{catch_shutdown_signal, shutdown_signal_task}; +pub use state::{is_reload_requested, is_shutdown_requested, request_reload, request_shutdown}; + +pub mod server; diff --git a/proxmox-rest-server/src/daemon.rs b/proxmox-daemon/src/server.rs similarity index 94% rename from proxmox-rest-server/src/daemon.rs rename to proxmox-daemon/src/server.rs index e914dc8e..19d81905 100644 --- a/proxmox-rest-server/src/daemon.rs +++ b/proxmox-daemon/src/server.rs @@ -1,4 +1,4 @@ -//! Helpers to implement restartable daemons/services. +//! Helpers to implement restartable server listening for incoming connections. use std::ffi::CString; use std::future::Future; @@ -7,7 +7,7 @@ use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::panic::UnwindSafe; use std::path::PathBuf; -use std::pin::Pin; +use std::pin::{pin, Pin}; use anyhow::{bail, format_err, Error}; use futures::future::{self, Either}; @@ -112,7 +112,7 @@ impl Reloader { } // Synchronisation pipe: - let (pold, pnew) = super::socketpair()?; + let (pold, pnew) = socketpair()?; // Start ourselves in the background: match unsafe { fork() } { @@ -342,13 +342,11 @@ where }; let server_future = Box::pin(service); - let shutdown_future = crate::shutdown_future(); + let shutdown_future = pin!(crate::shutdown_future()); let finish_future = match future::select(server_future, shutdown_future).await { Either::Left((_, _)) => { - if !crate::shutdown_requested() { - crate::request_shutdown(); // make sure we are in shutdown mode - } + crate::request_shutdown(); // make sure we are in shutdown mode None } Either::Right((_, server_future)) => Some(server_future), @@ -356,7 +354,7 @@ where let mut reloader = Some(reloader); - if crate::is_reload_request() { + if crate::is_reload_requested() { log::info!("daemon reload..."); if let Err(e) = proxmox_systemd::notify::SystemdNotify::Reloading.notify() { log::error!("failed to notify systemd about the state change: {}", e); @@ -382,3 +380,16 @@ where log::info!("daemon shut down."); Ok(()) } + +/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file +/// descriptors. +fn socketpair() -> Result<(OwnedFd, OwnedFd), Error> { + use nix::sys::socket; + let (pa, pb) = socket::socketpair( + socket::AddressFamily::Unix, + socket::SockType::Stream, + None, + socket::SockFlag::SOCK_CLOEXEC, + )?; + Ok(unsafe { (OwnedFd::from_raw_fd(pa), OwnedFd::from_raw_fd(pb)) }) +} diff --git a/proxmox-daemon/src/state.rs b/proxmox-daemon/src/state.rs new file mode 100644 index 00000000..f5d23bcf --- /dev/null +++ b/proxmox-daemon/src/state.rs @@ -0,0 +1,121 @@ +use std::future::Future; +use std::pin::pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::OnceLock; + +use anyhow::{bail, Error}; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::watch; + +static SHUTDOWN_LISTENERS: OnceLock> = OnceLock::new(); +static RELOAD_REQUESTED: AtomicBool = AtomicBool::new(false); +static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); + +/// Request a reload. +/// +/// This sets the reload flag and subsequently calls [`request_shutdown()`]. +pub fn request_reload() { + if !RELOAD_REQUESTED.swap(true, Ordering::Release) { + request_shutdown(); + } +} + +/// Returns true if a reload has been requested either via a signal or a call to +/// [`request_reload()`]. +pub fn is_reload_requested() -> bool { + RELOAD_REQUESTED.load(Ordering::Acquire) +} + +/// Request a shutdown. +/// +/// This sets both the shutdown flag and triggers [`shutdown_future()`] to finish. +pub fn request_shutdown() { + log::info!("request_shutdown"); + + if !SHUTDOWN_REQUESTED.swap(true, Ordering::Release) { + let _ = shutdown_listeners().send(true); + } +} + +/// Returns true if a shutdown has been requested either via a signal or a call to +/// [`request_shutdown()`]. +pub fn is_shutdown_requested() -> bool { + SHUTDOWN_REQUESTED.load(Ordering::Acquire) +} + +fn shutdown_listeners() -> &'static watch::Sender { + SHUTDOWN_LISTENERS.get_or_init(|| watch::channel(false).0) +} + +/// This future finishes once a shutdown has been requested either via a signal or a call to +/// [`request_shutdown()`]. +pub async fn shutdown_future() { + let _ = shutdown_listeners().subscribe().wait_for(|&v| v).await; +} + +/// Pin and select(). +async fn pin_select(a: A, b: B) +where + A: Future + Send + 'static, + B: Future + Send + 'static, +{ + let a = pin!(a); + let b = pin!(b); + futures::future::select(a, b).await; +} + +/// Creates a task which listens for a `SIGINT` and then calls [`request_shutdown()`] while also +/// *undoing* a previous *reload* request. +pub fn shutdown_signal_task() -> Result + Send + 'static, Error> { + let mut stream = signal(SignalKind::interrupt())?; + + Ok(async move { + while stream.recv().await.is_some() { + log::info!("got shutdown request (SIGINT)"); + RELOAD_REQUESTED.store(false, Ordering::Release); + request_shutdown(); + } + }) +} + +/// Spawn a [`shutdown_signal_task()`] which is automatically aborted with the provided +/// `abort_future`. +pub fn catch_shutdown_signal(abort_future: F) -> Result<(), Error> +where + F: Future + Send + 'static, +{ + log::info!("catching shutdown signal"); + tokio::spawn(pin_select(shutdown_signal_task()?, abort_future)); + Ok(()) +} + +/// Creates a task which listens for a `SIGHUP` and then calls [`request_reload()`]. +pub fn reload_signal_task() -> Result + Send + 'static, Error> { + let mut stream = signal(SignalKind::hangup())?; + + Ok(async move { + while stream.recv().await.is_some() { + log::info!("got reload request (SIGHUP)"); + request_reload(); + } + }) +} + +/// Spawn a [`reload_signal_task()`] which is automatically aborted with the provided +/// `abort_future`. +pub fn catch_reload_signal(abort_future: F) -> Result<(), Error> +where + F: Future + Send + 'static, +{ + log::info!("catching reload signal"); + tokio::spawn(pin_select(reload_signal_task()?, abort_future)); + Ok(()) +} + +/// Raise an error if there was a shutdown request. +pub fn fail_on_shutdown() -> Result<(), Error> { + if is_shutdown_requested() { + bail!("Server shutdown requested - aborting task"); + } + Ok(()) +} diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml index 0ce52808..1fc292d3 100644 --- a/proxmox-rest-server/Cargo.toml +++ b/proxmox-rest-server/Cargo.toml @@ -38,6 +38,7 @@ url.workspace = true proxmox-async.workspace = true proxmox-compression.workspace = true +proxmox-daemon.workspace = true proxmox-http = { workspace = true, optional = true } proxmox-lang.workspace = true proxmox-log.workspace = true diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs index 5b7a2a4f..ddc37f22 100644 --- a/proxmox-rest-server/src/api_config.rs +++ b/proxmox-rest-server/src/api_config.rs @@ -12,12 +12,13 @@ use hyper::http::request::Parts; use hyper::{Body, Response}; use tower_service::Service; +use proxmox_daemon::command_socket::CommandSocket; use proxmox_log::{FileLogOptions, FileLogger}; use proxmox_router::{Router, RpcEnvironmentType, UserInformation}; use proxmox_sys::fs::{create_path, CreateOptions}; use crate::rest::Handler; -use crate::{CommandSocket, RestEnvironment}; +use crate::RestEnvironment; /// REST server configuration pub struct ApiConfig { diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs index 217af884..fbdfe96c 100644 --- a/proxmox-rest-server/src/connection.rs +++ b/proxmox-rest-server/src/connection.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use std::os::fd::FromRawFd; use std::os::unix::io::AsRawFd; use std::path::PathBuf; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -278,7 +278,7 @@ impl AcceptBuilder { sender: Sender, ) { let accept_counter = Arc::new(()); - let mut shutdown_future = crate::shutdown_future().fuse(); + let mut shutdown_future = pin!(proxmox_daemon::shutdown_future().fuse()); loop { let (socket, peer) = futures::select! { diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs index 0c8f45e0..3c9e887b 100644 --- a/proxmox-rest-server/src/lib.rs +++ b/proxmox-rest-server/src/lib.rs @@ -7,7 +7,6 @@ //! //! * highly threaded code, uses Rust async //! * static API definitions using schemas -//! * restartable systemd daemons using `systemd_notify` //! * support for long running worker tasks (threads or async tokio tasks) //! * supports separate access and authentication log files //! * extra control socket to trigger management operations @@ -18,10 +17,8 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use std::fmt; -use std::os::unix::io::{FromRawFd, OwnedFd}; -use std::sync::atomic::{AtomicBool, Ordering}; -use anyhow::{bail, format_err, Error}; +use anyhow::{format_err, Error}; use nix::unistd::Pid; use proxmox_sys::fs::CreateOptions; @@ -30,19 +27,11 @@ use proxmox_sys::linux::procfs::PidStat; mod compression; pub use compression::*; -pub mod daemon; - pub mod formatter; mod environment; pub use environment::*; -mod state; -pub use state::*; - -mod command_socket; -pub use command_socket::*; - mod api_config; pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor}; @@ -90,57 +79,6 @@ pub fn read_pid(pid_fn: &str) -> Result { .map_err(|err| format_err!("could not parse pid - {}", err)) } -/// Returns the control socket path for a specific process ID. -/// -/// Note: The control socket always uses @/run/proxmox-backup/ as -/// prefix for historic reason. This does not matter because the -/// generated path is unique for each ``pid`` anyways. -pub fn ctrl_sock_from_pid(pid: i32) -> String { - // Note: The control socket always uses @/run/proxmox-backup/ as prefix - // for historc reason. - format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) -} - -/// Returns the control socket path for this server. -pub fn our_ctrl_sock() -> String { - ctrl_sock_from_pid(*PID) -} - -static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); - -/// Request a server shutdown (usually called from [catch_shutdown_signal]) -pub fn request_shutdown() { - SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst); - crate::server_shutdown(); -} - -/// Returns true if there was a shutdown request. -#[inline(always)] -pub fn shutdown_requested() -> bool { - SHUTDOWN_REQUESTED.load(Ordering::SeqCst) -} - -/// Raise an error if there was a shutdown request. -pub fn fail_on_shutdown() -> Result<(), Error> { - if shutdown_requested() { - bail!("Server shutdown requested - aborting task"); - } - Ok(()) -} - -/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file -/// descriptors. -fn socketpair() -> Result<(OwnedFd, OwnedFd), Error> { - use nix::sys::socket; - let (pa, pb) = socket::socketpair( - socket::AddressFamily::Unix, - socket::SockType::Stream, - None, - socket::SockFlag::SOCK_CLOEXEC, - )?; - Ok(unsafe { (OwnedFd::from_raw_fd(pa), OwnedFd::from_raw_fd(pb)) }) -} - /// Extract a specific cookie from cookie header. /// We assume cookie_name is already url encoded. pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option { diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs deleted file mode 100644 index c3b81627..00000000 --- a/proxmox-rest-server/src/state.rs +++ /dev/null @@ -1,161 +0,0 @@ -use anyhow::Error; -use lazy_static::lazy_static; -use std::sync::Mutex; - -use futures::*; - -use tokio::signal::unix::{signal, SignalKind}; - -use proxmox_async::broadcast_future::BroadcastData; - -use crate::request_shutdown; - -#[derive(PartialEq, Copy, Clone, Debug)] -enum ServerMode { - Normal, - Shutdown, -} - -struct ServerState { - mode: ServerMode, - shutdown_listeners: BroadcastData<()>, - last_worker_listeners: BroadcastData<()>, - worker_count: usize, - internal_task_count: usize, - reload_request: bool, -} - -lazy_static! { - static ref SERVER_STATE: Mutex = Mutex::new(ServerState { - mode: ServerMode::Normal, - shutdown_listeners: BroadcastData::new(), - last_worker_listeners: BroadcastData::new(), - worker_count: 0, - internal_task_count: 0, - reload_request: false, - }); -} - -/// Listen to ``SIGINT`` for server shutdown -/// -/// This calls [request_shutdown] when receiving the signal. -pub fn catch_shutdown_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::interrupt())?; - - let future = async move { - while stream.recv().await.is_some() { - log::info!("got shutdown request (SIGINT)"); - SERVER_STATE.lock().unwrap().reload_request = false; - request_shutdown(); - } - } - .boxed(); - - let abort_future = last_worker_future().map_err(|_| {}); - let task = futures::future::select(future, abort_future); - - tokio::spawn(task.map(|_| ())); - - Ok(()) -} - -/// Listen to ``SIGHUP`` for server reload -/// -/// This calls [request_shutdown] when receiving the signal, and tries -/// to restart the server. -pub fn catch_reload_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::hangup())?; - - let future = async move { - while stream.recv().await.is_some() { - log::info!("got reload request (SIGHUP)"); - SERVER_STATE.lock().unwrap().reload_request = true; - crate::request_shutdown(); - } - } - .boxed(); - - let abort_future = last_worker_future().map_err(|_| {}); - let task = futures::future::select(future, abort_future); - - tokio::spawn(task.map(|_| ())); - - Ok(()) -} - -pub(crate) fn is_reload_request() -> bool { - let data = SERVER_STATE.lock().unwrap(); - - data.mode == ServerMode::Shutdown && data.reload_request -} - -pub(crate) fn server_shutdown() { - let mut data = SERVER_STATE.lock().unwrap(); - - log::info!("request_shutdown"); - - data.mode = ServerMode::Shutdown; - - data.shutdown_listeners.notify_listeners(Ok(())); - - drop(data); // unlock - - check_last_worker(); -} - -/// Future to signal server shutdown -pub fn shutdown_future() -> impl Future { - let mut data = SERVER_STATE.lock().unwrap(); - data.shutdown_listeners.listen().map(|_| ()) -} - -/// Future to signal when last worker task finished -pub fn last_worker_future() -> impl Future> { - let mut data = SERVER_STATE.lock().unwrap(); - data.last_worker_listeners.listen() -} - -pub(crate) fn set_worker_count(count: usize) { - SERVER_STATE.lock().unwrap().worker_count = count; - - check_last_worker(); -} - -pub(crate) fn check_last_worker() { - let mut data = SERVER_STATE.lock().unwrap(); - - if !(data.mode == ServerMode::Shutdown - && data.worker_count == 0 - && data.internal_task_count == 0) - { - return; - } - - data.last_worker_listeners.notify_listeners(Ok(())); -} - -/// Spawns a tokio task that will be tracked for reload -/// and if it is finished, notify the [last_worker_future] if we -/// are in shutdown mode. -pub fn spawn_internal_task(task: T) -where - T: Future + Send + 'static, - T::Output: Send + 'static, -{ - let mut data = SERVER_STATE.lock().unwrap(); - data.internal_task_count += 1; - - tokio::spawn(async move { - let _ = tokio::spawn(task).await; // ignore errors - - { - // drop mutex - let mut data = SERVER_STATE.lock().unwrap(); - if data.internal_task_count > 0 { - data.internal_task_count -= 1; - } - } - - check_last_worker(); - }); -} diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs index a4515985..8691748e 100644 --- a/proxmox-rest-server/src/worker_task.rs +++ b/proxmox-rest-server/src/worker_task.rs @@ -3,8 +3,8 @@ use std::fs::File; use std::io::{BufRead, BufReader, Read, Write}; use std::panic::UnwindSafe; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Error}; @@ -15,9 +15,10 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::signal::unix::SignalKind; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; use tracing::{info, warn}; +use proxmox_daemon::command_socket::CommandSocket; use proxmox_lang::try_block; use proxmox_log::{FileLogOptions, FileLogger, LogContext}; use proxmox_schema::upid::UPID; @@ -26,7 +27,66 @@ use proxmox_sys::linux::procfs; use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; use proxmox_worker_task::WorkerTaskContext; -use crate::CommandSocket; +static LAST_WORKER_LISTENERS: OnceLock> = OnceLock::new(); +static WORKER_COUNT: AtomicUsize = AtomicUsize::new(0); +static INTERNAL_TASK_COUNT: AtomicUsize = AtomicUsize::new(0); + +fn last_worker_listeners() -> &'static watch::Sender { + LAST_WORKER_LISTENERS.get_or_init(|| watch::channel(false).0) +} + +/// This future finishes once there are no more running workers (including internal tasks). +pub async fn last_worker_future() { + let _ = last_worker_listeners().subscribe().wait_for(|&v| v).await; +} + +/// This drives the [`last_worker_listener()`] futures: if a shutdown is requested and no workers +/// and no internal tasks are running, the [`last_worker_listener()`] futures are triggered to +/// finish. +pub fn check_last_worker() { + if proxmox_daemon::is_shutdown_requested() + && WORKER_COUNT.load(Ordering::Acquire) == 0 + && INTERNAL_TASK_COUNT.load(Ordering::Acquire) == 0 + { + let _ = last_worker_listeners().send(true); + } +} + +/// Spawn a task which calls [`check_last_worker()`] in the case of a requested shutdown. This used +/// to be implied by the [`request_shutdown()`] call when it was part of the `proxmox-rest-server` +/// crate, which is no longer the case. +fn check_workers_on_shutdown() { + tokio::spawn(async { + let _ = proxmox_daemon::shutdown_future().await; + check_last_worker(); + }); +} + +/// Spawns a tokio task that will be tracked for reload +/// and if it is finished, notify the [last_worker_future] if we +/// are in shutdown mode. +pub fn spawn_internal_task(task: T) +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + INTERNAL_TASK_COUNT.fetch_add(1, Ordering::Release); + + tokio::spawn(async move { + let _ = task.await; + INTERNAL_TASK_COUNT.fetch_sub(1, Ordering::Release); + + check_last_worker(); + }); +} + +/// Update the worker count. +/// If the count is set to 0 and no internal tasks are running, all [`last_worker_future()`] will +/// finish. +pub fn set_worker_count(count: usize) { + WORKER_COUNT.store(count, Ordering::Release); + check_last_worker(); +} #[allow(dead_code)] struct TaskListLockGuard(File); @@ -227,7 +287,9 @@ pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<( setup.create_task_log_dirs()?; WORKER_TASK_SETUP .set(setup) - .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) + .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))?; + check_workers_on_shutdown(); + Ok(()) } /// Optionally rotates and/or cleans up the task archive depending on its size and age. @@ -455,14 +517,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result { return Ok(false); } - let sock = crate::ctrl_sock_from_pid(upid.pid); + let sock = proxmox_daemon::command_socket::path_from_pid(upid.pid); let cmd = json!({ "command": "worker-task-status", "args": { "upid": upid.to_string(), }, }); - let status = crate::send_command(sock, &cmd).await?; + let status = proxmox_daemon::command_socket::send(sock, &cmd).await?; if let Some(active) = status.as_bool() { Ok(active) @@ -543,14 +605,16 @@ pub fn abort_worker_nowait(upid: UPID) { /// /// By sending ``worker-task-abort`` to the control socket. pub async fn abort_worker(upid: UPID) -> Result<(), Error> { - let sock = crate::ctrl_sock_from_pid(upid.pid); + let sock = proxmox_daemon::command_socket::path_from_pid(upid.pid); let cmd = json!({ "command": "worker-task-abort", "args": { "upid": upid.to_string(), }, }); - crate::send_command(sock, &cmd).map_ok(|_| ()).await + proxmox_daemon::command_socket::send(sock, &cmd) + .map_ok(|_| ()) + .await } fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { @@ -860,7 +924,7 @@ impl WorkerTask { { let mut hash = WORKER_TASK_LIST.lock().unwrap(); hash.insert(task_id, worker.clone()); - crate::set_worker_count(hash.len()); + set_worker_count(hash.len()); } setup.update_active_workers(Some(&upid))?; @@ -958,7 +1022,7 @@ impl WorkerTask { WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); let _ = self.setup.update_active_workers(None); - crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); + set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); } /// Log a message. @@ -1020,11 +1084,11 @@ impl WorkerTaskContext for WorkerTask { } fn shutdown_requested(&self) -> bool { - crate::shutdown_requested() + proxmox_daemon::is_shutdown_requested() } fn fail_on_shutdown(&self) -> Result<(), Error> { - crate::fail_on_shutdown() + proxmox_daemon::fail_on_shutdown() } }