introduce proxmox-daemon crate

split from rest-server:
- "state" module (shutdown/reload state)
- shutdown futures
- "daemon" module (named 'server' module in proxmox-daemon)
- command socket

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-07-24 09:11:48 +02:00
parent fb1a75d48f
commit d3abd366c4
15 changed files with 401 additions and 319 deletions

View File

@ -12,6 +12,7 @@ members = [
"proxmox-client",
"proxmox-compression",
"proxmox-config-digest",
"proxmox-daemon",
"proxmox-dns-api",
"proxmox-http",
"proxmox-http-error",
@ -123,6 +124,7 @@ proxmox-apt-api-types = { version = "1.0.1", path = "proxmox-apt-api-types" }
proxmox-auth-api = { version = "0.4.0", path = "proxmox-auth-api" }
proxmox-async = { version = "0.4.1", path = "proxmox-async" }
proxmox-compression = { version = "0.2.3", path = "proxmox-compression" }
proxmox-daemon = { version = "0.1.0", path = "proxmox-daemon" }
proxmox-http = { version = "0.9.2", path = "proxmox-http" }
proxmox-http-error = { version = "0.1.0", path = "proxmox-http-error" }
proxmox-human-byte = { version = "0.1.0", path = "proxmox-human-byte" }

26
proxmox-daemon/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "proxmox-daemon"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
description = """
Daemon state handling (catching reload signals, registering commands with the command socket, ...)
"""
exclude.workspace = true
[dependencies]
anyhow.workspace = true
futures.workspace = true
libc.workspace = true
log.workspace = true
nix.workspace = true
once_cell.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["io-util", "net", "rt", "signal", "sync"] }
proxmox-sys.workspace = true
proxmox-systemd.workspace = true

View File

@ -0,0 +1,5 @@
rust-proxmox-daemon (0.1.0-1) bookworm; urgency=medium
* initial split out of proxmox-rest-server
-- Proxmox Support Team <support@proxmox.com> Tue, 23 Jul 2024 13:15:03 +0200

View File

@ -0,0 +1,18 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Files:
*
Copyright: 2024 Proxmox Server Solutions GmbH <support@proxmox.com>
License: AGPL-3.0-or-later
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option) any
later version.
.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
details.
.
You should have received a copy of the GNU Affero General Public License along
with this program. If not, see <https://www.gnu.org/licenses/>.

View File

@ -0,0 +1,7 @@
overlay = "."
crate_src_path = ".."
maintainer = "Proxmox Support Team <support@proxmox.com>"
[source]
vcs_git = "git://git.proxmox.com/git/proxmox.git"
vcs_browser = "https://git.proxmox.com/?p=proxmox.git"

View File

@ -1,26 +1,46 @@
use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use std::future::Future;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::Arc;
use futures::*;
use nix::sys::socket;
use nix::unistd::Gid;
use serde::Serialize;
use serde_json::Value;
use tokio::net::UnixListener;
use tokio::sync::watch;
/// Returns the control socket path for a specific process ID.
///
/// Note: The control socket always uses @/run/proxmox-backup/ as
/// prefix for historic reason. This does not matter because the
/// generated path is unique for each ``pid`` anyways.
pub fn path_from_pid(pid: i32) -> String {
// Note: The control socket always uses @/run/proxmox-backup/ as prefix
// for historc reason.
format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
}
/// Returns the control socket path for this server.
pub fn this_path() -> String {
path_from_pid(unsafe { libc::getpid() })
}
// Listens on a Unix Socket to handle simple command asynchronously
fn create_control_socket<P, F>(
fn create_control_socket<P, F, W>(
path: P,
gid: Gid,
abort_future: W,
func: F,
) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
W: Future<Output = ()> + Send + 'static,
{
let path: PathBuf = path.into();
@ -30,8 +50,24 @@ where
let func = Arc::new(func);
let control_future = async move {
let (abort_sender, abort_receiver) = watch::channel(false);
tokio::spawn(async move {
abort_future.await;
let _ = abort_sender.send(true);
});
let abort_future = {
let abort_receiver = abort_receiver.clone();
async move {
let _ = { abort_receiver }.wait_for(|&v| v).await;
}
};
let control_future = Box::pin(async move {
loop {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let (conn, _addr) = match socket.accept().await {
Ok(data) => data,
Err(err) => {
@ -40,7 +76,7 @@ where
}
};
let opt = socket::sockopt::PeerCredentials {};
let opt = socket::sockopt::PeerCredentials;
let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
Ok(cred) => cred,
Err(err) => {
@ -50,96 +86,90 @@ where
};
// check permissions (same gid, root user, or backup group)
let mygid = unsafe { libc::getgid() };
if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) {
let mygid = Gid::current();
if !(cred.uid() == 0 || cred.gid() == mygid.as_raw() || cred.gid() == gid) {
log::error!("no permissions for {:?}", cred);
continue;
}
let (rx, mut tx) = tokio::io::split(conn);
let abort_future = super::last_worker_future().map(|_| ());
let abort_future = {
let abort_receiver = abort_receiver.clone();
Box::pin(async move {
let _ = { abort_receiver }.wait_for(|&v| v).await;
})
};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let func = Arc::clone(&func);
let path = path.clone();
tokio::spawn(
futures::future::select(
async move {
let mut rx = tokio::io::BufReader::new(rx);
let mut line = String::new();
loop {
line.clear();
match rx
.read_line({
line.clear();
&mut line
})
.await
{
Ok(0) => break,
Ok(_) => (),
Err(err) => {
log::error!("control socket {:?} read error: {}", path, err);
return;
}
}
let response = match line.parse::<Value>() {
Ok(param) => match func(param) {
Ok(res) => format!("OK: {}\n", res),
Err(err) => format!("ERROR: {}\n", err),
},
Err(err) => format!("ERROR: {}\n", err),
};
if let Err(err) = tx.write_all(response.as_bytes()).await {
log::error!(
"control socket {:?} write response error: {}",
path,
err
);
tokio::spawn(futures::future::select(
Box::pin(async move {
let mut rx = tokio::io::BufReader::new(rx);
let mut line = String::new();
loop {
line.clear();
match rx
.read_line({
line.clear();
&mut line
})
.await
{
Ok(0) => break,
Ok(_) => (),
Err(err) => {
log::error!("control socket {:?} read error: {}", path, err);
return;
}
}
let response = match line.parse::<Value>() {
Ok(param) => match func(param) {
Ok(res) => format!("OK: {}\n", res),
Err(err) => format!("ERROR: {}\n", err),
},
Err(err) => format!("ERROR: {}\n", err),
};
if let Err(err) = tx.write_all(response.as_bytes()).await {
log::error!("control socket {:?} write response error: {}", path, err);
return;
}
}
.boxed(),
abort_future,
)
.map(|_| ()),
);
}),
abort_future,
));
}
}
.boxed();
});
let abort_future = crate::last_worker_future().map_err(|_| {});
let task = futures::future::select(control_future, abort_future)
.map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
Ok(task)
Ok(async move {
let abort_future = pin!(abort_future);
futures::future::select(control_future, abort_future).await;
})
}
/// Send a command to the specified socket
pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
pub async fn send<P, T>(path: P, params: &T) -> Result<Value, Error>
where
P: AsRef<Path>,
T: ?Sized + Serialize,
{
let mut command_string = serde_json::to_string(params)?;
command_string.push('\n');
send_raw_command(path.as_ref(), &command_string).await
send_raw(path.as_ref(), &command_string).await
}
/// Send a raw command (string) to the specified socket
pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
pub async fn send_raw<P>(path: P, command_string: &str) -> Result<Value, Error>
where
P: AsRef<Path>,
{
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let mut conn = tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err))
.await?;
.await
.map_err(move |err| format_err!("control socket connect failed - {}", err))?;
conn.write_all(command_string.as_bytes()).await?;
if !command_string.as_bytes().ends_with(b"\n") {
@ -164,7 +194,7 @@ where
}
}
// A callback for a specific commando socket.
// A callback for a specific command socket.
type CommandSocketFn =
Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
@ -181,12 +211,9 @@ pub struct CommandSocket {
impl CommandSocket {
/// Creates a new instance.
pub fn new<P>(path: P, gid: Gid) -> Self
where
P: Into<PathBuf>,
{
pub fn new(gid: Gid) -> Self {
CommandSocket {
socket: path.into(),
socket: this_path().into(),
gid,
commands: HashMap::new(),
}
@ -194,9 +221,18 @@ impl CommandSocket {
/// Spawn the socket and consume self, meaning you cannot register commands anymore after
/// calling this.
pub fn spawn(self) -> Result<(), Error> {
let control_future =
create_control_socket(self.socket.to_owned(), self.gid, move |param| {
///
/// The `abort_future` is typically a `last_worker_future()` and is there because this
/// `spawn()`s a task which would otherwise never finish.
pub fn spawn<F>(self, abort_future: F) -> Result<(), Error>
where
F: Future<Output = ()> + Send + 'static,
{
let control_future = create_control_socket(
self.socket.to_owned(),
self.gid,
abort_future,
move |param| {
let param = param.as_object().ok_or_else(|| {
format_err!("unable to parse parameters (expected json object)")
})?;
@ -218,7 +254,8 @@ impl CommandSocket {
(handler)(args)
}
}
})?;
},
)?;
tokio::spawn(control_future);

12
proxmox-daemon/src/lib.rs Normal file
View File

@ -0,0 +1,12 @@
//! Daemon and related state handling.
pub mod command_socket;
mod state;
pub use state::fail_on_shutdown;
pub use state::shutdown_future;
pub use state::{catch_reload_signal, reload_signal_task};
pub use state::{catch_shutdown_signal, shutdown_signal_task};
pub use state::{is_reload_requested, is_shutdown_requested, request_reload, request_shutdown};
pub mod server;

View File

@ -1,4 +1,4 @@
//! Helpers to implement restartable daemons/services.
//! Helpers to implement restartable server listening for incoming connections.
use std::ffi::CString;
use std::future::Future;
@ -7,7 +7,7 @@ use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::panic::UnwindSafe;
use std::path::PathBuf;
use std::pin::Pin;
use std::pin::{pin, Pin};
use anyhow::{bail, format_err, Error};
use futures::future::{self, Either};
@ -112,7 +112,7 @@ impl Reloader {
}
// Synchronisation pipe:
let (pold, pnew) = super::socketpair()?;
let (pold, pnew) = socketpair()?;
// Start ourselves in the background:
match unsafe { fork() } {
@ -342,13 +342,11 @@ where
};
let server_future = Box::pin(service);
let shutdown_future = crate::shutdown_future();
let shutdown_future = pin!(crate::shutdown_future());
let finish_future = match future::select(server_future, shutdown_future).await {
Either::Left((_, _)) => {
if !crate::shutdown_requested() {
crate::request_shutdown(); // make sure we are in shutdown mode
}
crate::request_shutdown(); // make sure we are in shutdown mode
None
}
Either::Right((_, server_future)) => Some(server_future),
@ -356,7 +354,7 @@ where
let mut reloader = Some(reloader);
if crate::is_reload_request() {
if crate::is_reload_requested() {
log::info!("daemon reload...");
if let Err(e) = proxmox_systemd::notify::SystemdNotify::Reloading.notify() {
log::error!("failed to notify systemd about the state change: {}", e);
@ -382,3 +380,16 @@ where
log::info!("daemon shut down.");
Ok(())
}
/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
/// descriptors.
fn socketpair() -> Result<(OwnedFd, OwnedFd), Error> {
use nix::sys::socket;
let (pa, pb) = socket::socketpair(
socket::AddressFamily::Unix,
socket::SockType::Stream,
None,
socket::SockFlag::SOCK_CLOEXEC,
)?;
Ok(unsafe { (OwnedFd::from_raw_fd(pa), OwnedFd::from_raw_fd(pb)) })
}

121
proxmox-daemon/src/state.rs Normal file
View File

@ -0,0 +1,121 @@
use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;
use anyhow::{bail, Error};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
static SHUTDOWN_LISTENERS: OnceLock<watch::Sender<bool>> = OnceLock::new();
static RELOAD_REQUESTED: AtomicBool = AtomicBool::new(false);
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
/// Request a reload.
///
/// This sets the reload flag and subsequently calls [`request_shutdown()`].
pub fn request_reload() {
if !RELOAD_REQUESTED.swap(true, Ordering::Release) {
request_shutdown();
}
}
/// Returns true if a reload has been requested either via a signal or a call to
/// [`request_reload()`].
pub fn is_reload_requested() -> bool {
RELOAD_REQUESTED.load(Ordering::Acquire)
}
/// Request a shutdown.
///
/// This sets both the shutdown flag and triggers [`shutdown_future()`] to finish.
pub fn request_shutdown() {
log::info!("request_shutdown");
if !SHUTDOWN_REQUESTED.swap(true, Ordering::Release) {
let _ = shutdown_listeners().send(true);
}
}
/// Returns true if a shutdown has been requested either via a signal or a call to
/// [`request_shutdown()`].
pub fn is_shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::Acquire)
}
fn shutdown_listeners() -> &'static watch::Sender<bool> {
SHUTDOWN_LISTENERS.get_or_init(|| watch::channel(false).0)
}
/// This future finishes once a shutdown has been requested either via a signal or a call to
/// [`request_shutdown()`].
pub async fn shutdown_future() {
let _ = shutdown_listeners().subscribe().wait_for(|&v| v).await;
}
/// Pin and select().
async fn pin_select<A, B>(a: A, b: B)
where
A: Future<Output = ()> + Send + 'static,
B: Future<Output = ()> + Send + 'static,
{
let a = pin!(a);
let b = pin!(b);
futures::future::select(a, b).await;
}
/// Creates a task which listens for a `SIGINT` and then calls [`request_shutdown()`] while also
/// *undoing* a previous *reload* request.
pub fn shutdown_signal_task() -> Result<impl Future<Output = ()> + Send + 'static, Error> {
let mut stream = signal(SignalKind::interrupt())?;
Ok(async move {
while stream.recv().await.is_some() {
log::info!("got shutdown request (SIGINT)");
RELOAD_REQUESTED.store(false, Ordering::Release);
request_shutdown();
}
})
}
/// Spawn a [`shutdown_signal_task()`] which is automatically aborted with the provided
/// `abort_future`.
pub fn catch_shutdown_signal<F>(abort_future: F) -> Result<(), Error>
where
F: Future<Output = ()> + Send + 'static,
{
log::info!("catching shutdown signal");
tokio::spawn(pin_select(shutdown_signal_task()?, abort_future));
Ok(())
}
/// Creates a task which listens for a `SIGHUP` and then calls [`request_reload()`].
pub fn reload_signal_task() -> Result<impl Future<Output = ()> + Send + 'static, Error> {
let mut stream = signal(SignalKind::hangup())?;
Ok(async move {
while stream.recv().await.is_some() {
log::info!("got reload request (SIGHUP)");
request_reload();
}
})
}
/// Spawn a [`reload_signal_task()`] which is automatically aborted with the provided
/// `abort_future`.
pub fn catch_reload_signal<F>(abort_future: F) -> Result<(), Error>
where
F: Future<Output = ()> + Send + 'static,
{
log::info!("catching reload signal");
tokio::spawn(pin_select(reload_signal_task()?, abort_future));
Ok(())
}
/// Raise an error if there was a shutdown request.
pub fn fail_on_shutdown() -> Result<(), Error> {
if is_shutdown_requested() {
bail!("Server shutdown requested - aborting task");
}
Ok(())
}

View File

@ -38,6 +38,7 @@ url.workspace = true
proxmox-async.workspace = true
proxmox-compression.workspace = true
proxmox-daemon.workspace = true
proxmox-http = { workspace = true, optional = true }
proxmox-lang.workspace = true
proxmox-log.workspace = true

View File

@ -12,12 +12,13 @@ use hyper::http::request::Parts;
use hyper::{Body, Response};
use tower_service::Service;
use proxmox_daemon::command_socket::CommandSocket;
use proxmox_log::{FileLogOptions, FileLogger};
use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{create_path, CreateOptions};
use crate::rest::Handler;
use crate::{CommandSocket, RestEnvironment};
use crate::RestEnvironment;
/// REST server configuration
pub struct ApiConfig {

View File

@ -8,7 +8,7 @@ use std::net::SocketAddr;
use std::os::fd::FromRawFd;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@ -278,7 +278,7 @@ impl AcceptBuilder {
sender: Sender,
) {
let accept_counter = Arc::new(());
let mut shutdown_future = crate::shutdown_future().fuse();
let mut shutdown_future = pin!(proxmox_daemon::shutdown_future().fuse());
loop {
let (socket, peer) = futures::select! {

View File

@ -7,7 +7,6 @@
//!
//! * highly threaded code, uses Rust async
//! * static API definitions using schemas
//! * restartable systemd daemons using `systemd_notify`
//! * support for long running worker tasks (threads or async tokio tasks)
//! * supports separate access and authentication log files
//! * extra control socket to trigger management operations
@ -18,10 +17,8 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use std::fmt;
use std::os::unix::io::{FromRawFd, OwnedFd};
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{bail, format_err, Error};
use anyhow::{format_err, Error};
use nix::unistd::Pid;
use proxmox_sys::fs::CreateOptions;
@ -30,19 +27,11 @@ use proxmox_sys::linux::procfs::PidStat;
mod compression;
pub use compression::*;
pub mod daemon;
pub mod formatter;
mod environment;
pub use environment::*;
mod state;
pub use state::*;
mod command_socket;
pub use command_socket::*;
mod api_config;
pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
@ -90,57 +79,6 @@ pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
.map_err(|err| format_err!("could not parse pid - {}", err))
}
/// Returns the control socket path for a specific process ID.
///
/// Note: The control socket always uses @/run/proxmox-backup/ as
/// prefix for historic reason. This does not matter because the
/// generated path is unique for each ``pid`` anyways.
pub fn ctrl_sock_from_pid(pid: i32) -> String {
// Note: The control socket always uses @/run/proxmox-backup/ as prefix
// for historc reason.
format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
}
/// Returns the control socket path for this server.
pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
/// Request a server shutdown (usually called from [catch_shutdown_signal])
pub fn request_shutdown() {
SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
crate::server_shutdown();
}
/// Returns true if there was a shutdown request.
#[inline(always)]
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
}
/// Raise an error if there was a shutdown request.
pub fn fail_on_shutdown() -> Result<(), Error> {
if shutdown_requested() {
bail!("Server shutdown requested - aborting task");
}
Ok(())
}
/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
/// descriptors.
fn socketpair() -> Result<(OwnedFd, OwnedFd), Error> {
use nix::sys::socket;
let (pa, pb) = socket::socketpair(
socket::AddressFamily::Unix,
socket::SockType::Stream,
None,
socket::SockFlag::SOCK_CLOEXEC,
)?;
Ok(unsafe { (OwnedFd::from_raw_fd(pa), OwnedFd::from_raw_fd(pb)) })
}
/// Extract a specific cookie from cookie header.
/// We assume cookie_name is already url encoded.
pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {

View File

@ -1,161 +0,0 @@
use anyhow::Error;
use lazy_static::lazy_static;
use std::sync::Mutex;
use futures::*;
use tokio::signal::unix::{signal, SignalKind};
use proxmox_async::broadcast_future::BroadcastData;
use crate::request_shutdown;
#[derive(PartialEq, Copy, Clone, Debug)]
enum ServerMode {
Normal,
Shutdown,
}
struct ServerState {
mode: ServerMode,
shutdown_listeners: BroadcastData<()>,
last_worker_listeners: BroadcastData<()>,
worker_count: usize,
internal_task_count: usize,
reload_request: bool,
}
lazy_static! {
static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
mode: ServerMode::Normal,
shutdown_listeners: BroadcastData::new(),
last_worker_listeners: BroadcastData::new(),
worker_count: 0,
internal_task_count: 0,
reload_request: false,
});
}
/// Listen to ``SIGINT`` for server shutdown
///
/// This calls [request_shutdown] when receiving the signal.
pub fn catch_shutdown_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::interrupt())?;
let future = async move {
while stream.recv().await.is_some() {
log::info!("got shutdown request (SIGINT)");
SERVER_STATE.lock().unwrap().reload_request = false;
request_shutdown();
}
}
.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
Ok(())
}
/// Listen to ``SIGHUP`` for server reload
///
/// This calls [request_shutdown] when receiving the signal, and tries
/// to restart the server.
pub fn catch_reload_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::hangup())?;
let future = async move {
while stream.recv().await.is_some() {
log::info!("got reload request (SIGHUP)");
SERVER_STATE.lock().unwrap().reload_request = true;
crate::request_shutdown();
}
}
.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
Ok(())
}
pub(crate) fn is_reload_request() -> bool {
let data = SERVER_STATE.lock().unwrap();
data.mode == ServerMode::Shutdown && data.reload_request
}
pub(crate) fn server_shutdown() {
let mut data = SERVER_STATE.lock().unwrap();
log::info!("request_shutdown");
data.mode = ServerMode::Shutdown;
data.shutdown_listeners.notify_listeners(Ok(()));
drop(data); // unlock
check_last_worker();
}
/// Future to signal server shutdown
pub fn shutdown_future() -> impl Future<Output = ()> {
let mut data = SERVER_STATE.lock().unwrap();
data.shutdown_listeners.listen().map(|_| ())
}
/// Future to signal when last worker task finished
pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
let mut data = SERVER_STATE.lock().unwrap();
data.last_worker_listeners.listen()
}
pub(crate) fn set_worker_count(count: usize) {
SERVER_STATE.lock().unwrap().worker_count = count;
check_last_worker();
}
pub(crate) fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap();
if !(data.mode == ServerMode::Shutdown
&& data.worker_count == 0
&& data.internal_task_count == 0)
{
return;
}
data.last_worker_listeners.notify_listeners(Ok(()));
}
/// Spawns a tokio task that will be tracked for reload
/// and if it is finished, notify the [last_worker_future] if we
/// are in shutdown mode.
pub fn spawn_internal_task<T>(task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let mut data = SERVER_STATE.lock().unwrap();
data.internal_task_count += 1;
tokio::spawn(async move {
let _ = tokio::spawn(task).await; // ignore errors
{
// drop mutex
let mut data = SERVER_STATE.lock().unwrap();
if data.internal_task_count > 0 {
data.internal_task_count -= 1;
}
}
check_last_worker();
});
}

View File

@ -3,8 +3,8 @@ use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
use std::panic::UnwindSafe;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
@ -15,9 +15,10 @@ use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::signal::unix::SignalKind;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
use tracing::{info, warn};
use proxmox_daemon::command_socket::CommandSocket;
use proxmox_lang::try_block;
use proxmox_log::{FileLogOptions, FileLogger, LogContext};
use proxmox_schema::upid::UPID;
@ -26,7 +27,66 @@ use proxmox_sys::linux::procfs;
use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
use proxmox_worker_task::WorkerTaskContext;
use crate::CommandSocket;
static LAST_WORKER_LISTENERS: OnceLock<watch::Sender<bool>> = OnceLock::new();
static WORKER_COUNT: AtomicUsize = AtomicUsize::new(0);
static INTERNAL_TASK_COUNT: AtomicUsize = AtomicUsize::new(0);
fn last_worker_listeners() -> &'static watch::Sender<bool> {
LAST_WORKER_LISTENERS.get_or_init(|| watch::channel(false).0)
}
/// This future finishes once there are no more running workers (including internal tasks).
pub async fn last_worker_future() {
let _ = last_worker_listeners().subscribe().wait_for(|&v| v).await;
}
/// This drives the [`last_worker_listener()`] futures: if a shutdown is requested and no workers
/// and no internal tasks are running, the [`last_worker_listener()`] futures are triggered to
/// finish.
pub fn check_last_worker() {
if proxmox_daemon::is_shutdown_requested()
&& WORKER_COUNT.load(Ordering::Acquire) == 0
&& INTERNAL_TASK_COUNT.load(Ordering::Acquire) == 0
{
let _ = last_worker_listeners().send(true);
}
}
/// Spawn a task which calls [`check_last_worker()`] in the case of a requested shutdown. This used
/// to be implied by the [`request_shutdown()`] call when it was part of the `proxmox-rest-server`
/// crate, which is no longer the case.
fn check_workers_on_shutdown() {
tokio::spawn(async {
let _ = proxmox_daemon::shutdown_future().await;
check_last_worker();
});
}
/// Spawns a tokio task that will be tracked for reload
/// and if it is finished, notify the [last_worker_future] if we
/// are in shutdown mode.
pub fn spawn_internal_task<T>(task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
INTERNAL_TASK_COUNT.fetch_add(1, Ordering::Release);
tokio::spawn(async move {
let _ = task.await;
INTERNAL_TASK_COUNT.fetch_sub(1, Ordering::Release);
check_last_worker();
});
}
/// Update the worker count.
/// If the count is set to 0 and no internal tasks are running, all [`last_worker_future()`] will
/// finish.
pub fn set_worker_count(count: usize) {
WORKER_COUNT.store(count, Ordering::Release);
check_last_worker();
}
#[allow(dead_code)]
struct TaskListLockGuard(File);
@ -227,7 +287,9 @@ pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(
setup.create_task_log_dirs()?;
WORKER_TASK_SETUP
.set(setup)
.map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
.map_err(|_| format_err!("init_worker_tasks failed - already initialized"))?;
check_workers_on_shutdown();
Ok(())
}
/// Optionally rotates and/or cleans up the task archive depending on its size and age.
@ -455,14 +517,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
return Ok(false);
}
let sock = crate::ctrl_sock_from_pid(upid.pid);
let sock = proxmox_daemon::command_socket::path_from_pid(upid.pid);
let cmd = json!({
"command": "worker-task-status",
"args": {
"upid": upid.to_string(),
},
});
let status = crate::send_command(sock, &cmd).await?;
let status = proxmox_daemon::command_socket::send(sock, &cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
@ -543,14 +605,16 @@ pub fn abort_worker_nowait(upid: UPID) {
///
/// By sending ``worker-task-abort`` to the control socket.
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
let sock = crate::ctrl_sock_from_pid(upid.pid);
let sock = proxmox_daemon::command_socket::path_from_pid(upid.pid);
let cmd = json!({
"command": "worker-task-abort",
"args": {
"upid": upid.to_string(),
},
});
crate::send_command(sock, &cmd).map_ok(|_| ()).await
proxmox_daemon::command_socket::send(sock, &cmd)
.map_ok(|_| ())
.await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@ -860,7 +924,7 @@ impl WorkerTask {
{
let mut hash = WORKER_TASK_LIST.lock().unwrap();
hash.insert(task_id, worker.clone());
crate::set_worker_count(hash.len());
set_worker_count(hash.len());
}
setup.update_active_workers(Some(&upid))?;
@ -958,7 +1022,7 @@ impl WorkerTask {
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
let _ = self.setup.update_active_workers(None);
crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
}
/// Log a message.
@ -1020,11 +1084,11 @@ impl WorkerTaskContext for WorkerTask {
}
fn shutdown_requested(&self) -> bool {
crate::shutdown_requested()
proxmox_daemon::is_shutdown_requested()
}
fn fail_on_shutdown(&self) -> Result<(), Error> {
crate::fail_on_shutdown()
proxmox_daemon::fail_on_shutdown()
}
}