From 2bda552b5523ee21b778cf8f2e86d095b64c9272 Mon Sep 17 00:00:00 2001 From: Thomas Lamprecht Date: Wed, 6 Apr 2022 16:55:39 +0200 Subject: [PATCH] rest server: rust fmt Signed-off-by: Thomas Lamprecht --- .../examples/minimal-rest-server.rs | 57 ++--- proxmox-rest-server/src/api_config.rs | 58 +++-- proxmox-rest-server/src/command_socket.rs | 144 ++++++----- proxmox-rest-server/src/compression.rs | 12 +- proxmox-rest-server/src/daemon.rs | 87 ++++--- proxmox-rest-server/src/environment.rs | 16 +- proxmox-rest-server/src/file_logger.rs | 23 +- proxmox-rest-server/src/formatter.rs | 43 ++-- proxmox-rest-server/src/h2service.rs | 54 ++-- proxmox-rest-server/src/lib.rs | 30 ++- proxmox-rest-server/src/rest.rs | 36 +-- proxmox-rest-server/src/state.rs | 28 ++- proxmox-rest-server/src/worker_task.rs | 233 +++++++++--------- 13 files changed, 453 insertions(+), 368 deletions(-) diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs index 7dd6fbcf..0b1bfd53 100644 --- a/proxmox-rest-server/examples/minimal-rest-server.rs +++ b/proxmox-rest-server/examples/minimal-rest-server.rs @@ -1,18 +1,20 @@ -use std::sync::Mutex; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; +use std::sync::Mutex; use anyhow::{bail, format_err, Error}; -use lazy_static::lazy_static; -use hyper::{Body, Response, Method}; use http::request::Parts; use http::HeaderMap; +use hyper::{Body, Method, Response}; +use lazy_static::lazy_static; +use proxmox_router::{ + list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation, +}; use proxmox_schema::api; -use proxmox_router::{list_subdirs_api_method, SubdirMap, Router, RpcEnvironmentType, UserInformation}; -use proxmox_rest_server::{ServerAdapter, ApiConfig, AuthError, RestServer, RestEnvironment}; +use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter}; // Create a Dummy User information system struct DummyUserInfo; @@ -34,13 +36,17 @@ struct MinimalServer; // implement the server adapter impl ServerAdapter for MinimalServer { - // normally this would check and authenticate the user fn check_auth( &self, _headers: &HeaderMap, _method: &Method, - ) -> Pin), AuthError>> + Send>> { + ) -> Pin< + Box< + dyn Future), AuthError>> + + Send, + >, + > { Box::pin(async move { // get some global/cached userinfo let userinfo: Box = Box::new(DummyUserInfo); @@ -121,7 +127,12 @@ fn create_item(name: String, value: String) -> Result<(), Error> { )] /// returns the value of an item fn get_item(name: String) -> Result { - ITEM_MAP.lock().unwrap().get(&name).map(|s| s.to_string()).ok_or_else(|| format_err!("no such item '{}'", name)) + ITEM_MAP + .lock() + .unwrap() + .get(&name) + .map(|s| s.to_string()) + .ok_or_else(|| format_err!("no such item '{}'", name)) } #[api( @@ -177,13 +188,9 @@ const SUBDIRS: SubdirMap = &[ &Router::new() .get(&API_METHOD_LIST_ITEMS) .post(&API_METHOD_CREATE_ITEM) - .match_all("name", &ITEM_ROUTER) - ), - ( - "ping", - &Router::new() - .get(&API_METHOD_PING) + .match_all("name", &ITEM_ROUTER), ), + ("ping", &Router::new().get(&API_METHOD_PING)), ]; const ROUTER: Router = Router::new() @@ -191,7 +198,6 @@ const ROUTER: Router = Router::new() .subdirs(SUBDIRS); async fn run() -> Result<(), Error> { - // we first have to configure the api environment (basedir etc.) let config = ApiConfig::new( @@ -204,21 +210,16 @@ async fn run() -> Result<(), Error> { // then we have to create a daemon that listens, accepts and serves // the api to clients - proxmox_rest_server::daemon::create_daemon( - ([127, 0, 0, 1], 65000).into(), - move |listener| { - let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; + proxmox_rest_server::daemon::create_daemon(([127, 0, 0, 1], 65000).into(), move |listener| { + let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; - Ok(async move { + Ok(async move { + hyper::Server::builder(incoming).serve(rest_server).await?; - hyper::Server::builder(incoming) - .serve(rest_server) - .await?; - - Ok(()) - }) - }, - ).await?; + Ok(()) + }) + }) + .await?; Ok(()) } diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs index ad76a15f..9d257fd1 100644 --- a/proxmox-rest-server/src/api_config.rs +++ b/proxmox-rest-server/src/api_config.rs @@ -1,22 +1,21 @@ use std::collections::HashMap; -use std::path::PathBuf; -use std::time::SystemTime; use std::fs::metadata; -use std::sync::{Arc, Mutex, RwLock}; +use std::path::PathBuf; use std::pin::Pin; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::SystemTime; -use anyhow::{bail, Error, format_err}; -use hyper::{Method, Body, Response}; +use anyhow::{bail, format_err, Error}; use hyper::http::request::Parts; +use hyper::{Body, Method, Response}; use handlebars::Handlebars; use serde::Serialize; -use proxmox_sys::fs::{create_path, CreateOptions}; use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation}; +use proxmox_sys::fs::{create_path, CreateOptions}; -use crate::{ServerAdapter, AuthError, FileLogger, FileLogOptions, CommandSocket, RestEnvironment}; - +use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter}; /// REST server configuration pub struct ApiConfig { @@ -87,12 +86,10 @@ impl ApiConfig { method: Method, uri_param: &mut HashMap, ) -> Option<&'static ApiMethod> { - self.router.find_method(components, method, uri_param) } pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf { - let mut prefix = String::new(); let mut filename = self.basedir.clone(); let comp_len = components.len(); @@ -100,7 +97,10 @@ impl ApiConfig { prefix.push_str(components[0]); if let Some(subdir) = self.aliases.get(&prefix) { filename.push(subdir); - components.iter().skip(1).for_each(|comp| filename.push(comp)); + components + .iter() + .skip(1) + .for_each(|comp| filename.push(comp)); } else { components.iter().for_each(|comp| filename.push(comp)); } @@ -121,8 +121,9 @@ impl ApiConfig { /// # } /// ``` pub fn add_alias(&mut self, alias: S, path: P) - where S: Into, - P: Into, + where + S: Into, + P: Into, { self.aliases.insert(alias.into(), path.into()); } @@ -136,7 +137,7 @@ impl ApiConfig { /// Those templates cane be use with [render_template](Self::render_template) to generate pages. pub fn register_template

(&self, name: &str, path: P) -> Result<(), Error> where - P: Into + P: Into, { if self.template_files.read().unwrap().contains_key(name) { bail!("template already registered"); @@ -146,8 +147,14 @@ impl ApiConfig { let metadata = metadata(&path)?; let mtime = metadata.modified()?; - self.templates.write().unwrap().register_template_file(name, &path)?; - self.template_files.write().unwrap().insert(name.to_string(), (mtime, path)); + self.templates + .write() + .unwrap() + .register_template_file(name, &path)?; + self.template_files + .write() + .unwrap() + .insert(name.to_string(), (mtime, path)); Ok(()) } @@ -162,11 +169,18 @@ impl ApiConfig { let mtime; { let template_files = self.template_files.read().unwrap(); - let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?; + let (old_mtime, old_path) = template_files + .get(name) + .ok_or_else(|| format_err!("template not found"))?; mtime = metadata(old_path)?.modified()?; if mtime <= *old_mtime { - return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err)); + return self + .templates + .read() + .unwrap() + .render(name, data) + .map_err(|err| format_err!("{}", err)); } path = old_path.to_path_buf(); } @@ -178,7 +192,9 @@ impl ApiConfig { templates.register_template_file(name, &path)?; template_files.insert(name.to_string(), (mtime, path)); - templates.render(name, data).map_err(|err| format_err!("{}", err)) + templates + .render(name, data) + .map_err(|err| format_err!("{}", err)) } } @@ -195,7 +211,7 @@ impl ApiConfig { commando_sock: &mut CommandSocket, ) -> Result<(), Error> where - P: Into + P: Into, { let path: PathBuf = path.into(); if let Some(base) = path.parent() { @@ -234,7 +250,7 @@ impl ApiConfig { commando_sock: &mut CommandSocket, ) -> Result<(), Error> where - P: Into + P: Into, { let path: PathBuf = path.into(); if let Some(base) = path.parent() { diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-rest-server/src/command_socket.rs index 46814c4f..a7fc7576 100644 --- a/proxmox-rest-server/src/command_socket.rs +++ b/proxmox-rest-server/src/command_socket.rs @@ -2,18 +2,22 @@ use anyhow::{bail, format_err, Error}; use std::collections::HashMap; use std::os::unix::io::AsRawFd; -use std::path::{PathBuf, Path}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use futures::*; -use tokio::net::UnixListener; -use serde::Serialize; -use serde_json::Value; use nix::sys::socket; use nix::unistd::Gid; +use serde::Serialize; +use serde_json::Value; +use tokio::net::UnixListener; // Listens on a Unix Socket to handle simple command asynchronously -fn create_control_socket(path: P, gid: Gid, func: F) -> Result, Error> +fn create_control_socket( + path: P, + gid: Gid, + func: F, +) -> Result, Error> where P: Into, F: Fn(Value) -> Result + Send + Sync + 'static, @@ -59,45 +63,57 @@ where use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; let func = Arc::clone(&func); let path = path.clone(); - tokio::spawn(futures::future::select( - async move { - let mut rx = tokio::io::BufReader::new(rx); - let mut line = String::new(); - loop { - line.clear(); - match rx.read_line({ line.clear(); &mut line }).await { - Ok(0) => break, - Ok(_) => (), - Err(err) => { - eprintln!("control socket {:?} read error: {}", path, err); + tokio::spawn( + futures::future::select( + async move { + let mut rx = tokio::io::BufReader::new(rx); + let mut line = String::new(); + loop { + line.clear(); + match rx + .read_line({ + line.clear(); + &mut line + }) + .await + { + Ok(0) => break, + Ok(_) => (), + Err(err) => { + eprintln!("control socket {:?} read error: {}", path, err); + return; + } + } + + let response = match line.parse::() { + Ok(param) => match func(param) { + Ok(res) => format!("OK: {}\n", res), + Err(err) => format!("ERROR: {}\n", err), + }, + Err(err) => format!("ERROR: {}\n", err), + }; + + if let Err(err) = tx.write_all(response.as_bytes()).await { + eprintln!( + "control socket {:?} write response error: {}", + path, err + ); return; } } - - let response = match line.parse::() { - Ok(param) => match func(param) { - Ok(res) => format!("OK: {}\n", res), - Err(err) => format!("ERROR: {}\n", err), - } - Err(err) => format!("ERROR: {}\n", err), - }; - - if let Err(err) = tx.write_all(response.as_bytes()).await { - eprintln!("control socket {:?} write response error: {}", path, err); - return; - } } - }.boxed(), - abort_future, - ).map(|_| ())); + .boxed(), + abort_future, + ) + .map(|_| ()), + ); } - }.boxed(); + } + .boxed(); let abort_future = crate::last_worker_future().map_err(|_| {}); - let task = futures::future::select( - control_future, - abort_future, - ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); + let task = futures::future::select(control_future, abort_future) + .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); Ok(task) } @@ -148,7 +164,8 @@ where } // A callback for a specific commando socket. -type CommandSocketFn = Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; +type CommandSocketFn = + Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; /// Tooling to get a single control command socket where one can /// register multiple commands dynamically. @@ -164,7 +181,8 @@ pub struct CommandSocket { impl CommandSocket { /// Creates a new instance. pub fn new

(path: P, gid: Gid) -> Self - where P: Into, + where + P: Into, { CommandSocket { socket: path.into(), @@ -176,29 +194,30 @@ impl CommandSocket { /// Spawn the socket and consume self, meaning you cannot register commands anymore after /// calling this. pub fn spawn(self) -> Result<(), Error> { - let control_future = create_control_socket(self.socket.to_owned(), self.gid, move |param| { - let param = param - .as_object() - .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; + let control_future = + create_control_socket(self.socket.to_owned(), self.gid, move |param| { + let param = param.as_object().ok_or_else(|| { + format_err!("unable to parse parameters (expected json object)") + })?; - let command = match param.get("command") { - Some(Value::String(command)) => command.as_str(), - None => bail!("no command"), - _ => bail!("unable to parse command"), - }; + let command = match param.get("command") { + Some(Value::String(command)) => command.as_str(), + None => bail!("no command"), + _ => bail!("unable to parse command"), + }; - if !self.commands.contains_key(command) { - bail!("got unknown command '{}'", command); - } + if !self.commands.contains_key(command) { + bail!("got unknown command '{}'", command); + } - match self.commands.get(command) { - None => bail!("got unknown command '{}'", command), - Some(handler) => { - let args = param.get("args"); //.unwrap_or(&Value::Null); - (handler)(args) - }, - } - })?; + match self.commands.get(command) { + None => bail!("got unknown command '{}'", command), + Some(handler) => { + let args = param.get("args"); //.unwrap_or(&Value::Null); + (handler)(args) + } + } + })?; tokio::spawn(control_future); @@ -206,15 +225,10 @@ impl CommandSocket { } /// Register a new command with a callback. - pub fn register_command( - &mut self, - command: String, - handler: F, - ) -> Result<(), Error> + pub fn register_command(&mut self, command: String, handler: F) -> Result<(), Error> where F: Fn(Option<&Value>) -> Result + Send + Sync + 'static, { - if self.commands.contains_key(&command) { bail!("command '{}' already exists!", command); } diff --git a/proxmox-rest-server/src/compression.rs b/proxmox-rest-server/src/compression.rs index 19626efc..189d7041 100644 --- a/proxmox-rest-server/src/compression.rs +++ b/proxmox-rest-server/src/compression.rs @@ -5,8 +5,8 @@ use hyper::header; #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] pub enum CompressionMethod { Deflate, -// Gzip, -// Brotli, + // Gzip, + // Brotli, } impl CompressionMethod { @@ -16,8 +16,8 @@ impl CompressionMethod { pub fn extension(&self) -> &'static str { match *self { -// CompressionMethod::Brotli => "br", -// CompressionMethod::Gzip => "gzip", + // CompressionMethod::Brotli => "br", + // CompressionMethod::Gzip => "gzip", CompressionMethod::Deflate => "deflate", } } @@ -28,8 +28,8 @@ impl std::str::FromStr for CompressionMethod { fn from_str(s: &str) -> Result { match s { -// "br" => Ok(CompressionMethod::Brotli), -// "gzip" => Ok(CompressionMethod::Gzip), + // "br" => Ok(CompressionMethod::Brotli), + // "gzip" => Ok(CompressionMethod::Gzip), "deflate" => Ok(CompressionMethod::Deflate), // http accept-encoding allows to give weights with ';q=' other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate), diff --git a/proxmox-rest-server/src/daemon.rs b/proxmox-rest-server/src/daemon.rs index 973cd306..4e09118d 100644 --- a/proxmox-rest-server/src/daemon.rs +++ b/proxmox-rest-server/src/daemon.rs @@ -3,9 +3,9 @@ use std::ffi::CString; use std::future::Future; use std::io::{Read, Write}; -use std::os::raw::{c_char, c_uchar, c_int}; -use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::os::raw::{c_char, c_int, c_uchar}; use std::os::unix::ffi::OsStrExt; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::panic::UnwindSafe; use std::path::PathBuf; @@ -13,8 +13,8 @@ use anyhow::{bail, format_err, Error}; use futures::future::{self, Either}; use nix::unistd::{fork, ForkResult}; -use proxmox_sys::fd::{fd_change_cloexec, Fd}; use proxmox_io::{ReadExt, WriteExt}; +use proxmox_sys::fd::{fd_change_cloexec, Fd}; // Unfortunately FnBox is nightly-only and Box is unusable, so just use Box... type BoxedStoreFunc = Box Result + UnwindSafe + Send>; @@ -102,9 +102,8 @@ impl Reloader { // At this point we call pre-exec helpers. We must be certain that if they fail for // whatever reason we can still call `_exit()`, so use catch_unwind. match std::panic::catch_unwind(move || { - let mut pnew = unsafe { - std::fs::File::from_raw_fd(pnew.into_raw_fd()) - }; + let mut pnew = + unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) }; let pid = nix::unistd::Pid::this(); if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } { log::error!("failed to send new server PID to parent: {}", e); @@ -125,16 +124,19 @@ impl Reloader { std::mem::drop(pnew); // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs - let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap(); + let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()) + .unwrap(); let ident = ident.as_bytes(); - let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }; + let fd = + unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }; if fd >= 0 && fd != 1 { let fd = proxmox_sys::fd::Fd(fd); // add drop handler nix::unistd::dup2(fd.as_raw_fd(), 1)?; } else { log::error!("failed to update STDOUT journal redirection ({})", fd); } - let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }; + let fd = + unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }; if fd >= 0 && fd != 2 { let fd = proxmox_sys::fd::Fd(fd); // add drop handler nix::unistd::dup2(fd.as_raw_fd(), 2)?; @@ -143,8 +145,7 @@ impl Reloader { } self.do_reexec(new_args) - }) - { + }) { Ok(Ok(())) => log::error!("do_reexec returned!"), Ok(Err(err)) => log::error!("do_reexec failed: {}", err), Err(_) => log::error!("panic in re-exec"), @@ -157,20 +158,22 @@ impl Reloader { Err(e) => log::error!("fork() failed, restart delayed: {}", e), } // No matter how we managed to get here, this is the time where we bail out quickly: - unsafe { - libc::_exit(-1) - } + unsafe { libc::_exit(-1) } } Ok(ForkResult::Parent { child }) => { - log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child); + log::debug!( + "forked off a new server (first pid: {}), waiting for 2nd pid", + child + ); std::mem::drop(pnew); - let mut pold = unsafe { - std::fs::File::from_raw_fd(pold.into_raw_fd()) - }; + let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) }; let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } { Ok(v) => v, Err(e) => { - log::error!("failed to receive pid of double-forked child process: {}", e); + log::error!( + "failed to receive pid of double-forked child process: {}", + e + ); // systemd will complain but won't kill the service... return Ok(()); } @@ -215,9 +218,10 @@ impl Reloadable for tokio::net::TcpListener { // FIXME: We could become "independent" of the TcpListener and its reference to the file // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?) fn get_store_func(&self) -> Result { - let mut fd_opt = Some(Fd( - nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))? - )); + let mut fd_opt = Some(Fd(nix::fcntl::fcntl( + self.as_raw_fd(), + nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), + )?)); Ok(Box::new(move || { let fd = fd_opt.take().unwrap(); fd_change_cloexec(fd.as_raw_fd(), false)?; @@ -226,13 +230,13 @@ impl Reloadable for tokio::net::TcpListener { } fn restore(var: &str) -> Result { - let fd = var.parse::() - .map_err(|e| format_err!("invalid file descriptor: {}", e))? - as RawFd; + let fd = var + .parse::() + .map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd; fd_change_cloexec(fd, true)?; - Ok(Self::from_std( - unsafe { std::net::TcpListener::from_raw_fd(fd) }, - )?) + Ok(Self::from_std(unsafe { + std::net::TcpListener::from_raw_fd(fd) + })?) } } @@ -253,10 +257,11 @@ where { let mut reloader = Reloader::new()?; - let listener: tokio::net::TcpListener = reloader.restore( - "PROXMOX_BACKUP_LISTEN_FD", - move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) }, - ).await?; + let listener: tokio::net::TcpListener = reloader + .restore("PROXMOX_BACKUP_LISTEN_FD", move || async move { + Ok(tokio::net::TcpListener::bind(&address).await?) + }) + .await?; let service = create_service(listener)?; @@ -308,7 +313,11 @@ where #[link(name = "systemd")] extern "C" { - fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int; + fn sd_journal_stream_fd( + identifier: *const c_uchar, + priority: c_int, + level_prefix: c_int, + ) -> c_int; fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int; fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int; } @@ -328,7 +337,7 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { SystemdNotify::Ready => { log::info!("service is ready"); CString::new("READY=1") - }, + } SystemdNotify::Reloading => CString::new("RELOADING=1"), SystemdNotify::Stopping => CString::new("STOPPING=1"), SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)), @@ -336,7 +345,10 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { }?; let rc = unsafe { sd_notify(0, message.as_ptr()) }; if rc < 0 { - bail!("systemd_notify failed: {}", std::io::Error::from_raw_os_error(-rc)); + bail!( + "systemd_notify failed: {}", + std::io::Error::from_raw_os_error(-rc) + ); } Ok(()) @@ -346,7 +358,10 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> { let rc = unsafe { sd_notify_barrier(0, timeout) }; if rc < 0 { - bail!("systemd_notify_barrier failed: {}", std::io::Error::from_raw_os_error(-rc)); + bail!( + "systemd_notify_barrier failed: {}", + std::io::Error::from_raw_os_error(-rc) + ); } Ok(()) } diff --git a/proxmox-rest-server/src/environment.rs b/proxmox-rest-server/src/environment.rs index b1fa3a33..b4dff76b 100644 --- a/proxmox-rest-server/src/environment.rs +++ b/proxmox-rest-server/src/environment.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::net::SocketAddr; +use std::sync::Arc; use serde_json::{json, Value}; @@ -42,13 +42,19 @@ impl RestEnvironment { pub fn log_failed_auth(&self, failed_auth_id: Option, msg: &str) { let msg = match (self.client_ip, failed_auth_id) { (Some(peer), Some(user)) => { - format!("authentication failure; rhost={} user={} msg={}", peer, user, msg) + format!( + "authentication failure; rhost={} user={} msg={}", + peer, user, msg + ) } (Some(peer), None) => { format!("authentication failure; rhost={} msg={}", peer, msg) } (None, Some(user)) => { - format!("authentication failure; rhost=unknown user={} msg={}", user, msg) + format!( + "authentication failure; rhost=unknown user={} msg={}", + user, msg + ) } (None, None) => { format!("authentication failure; rhost=unknown msg={}", msg) @@ -59,12 +65,10 @@ impl RestEnvironment { auth_logger.lock().unwrap().log(&msg); } } - } impl RpcEnvironment for RestEnvironment { - - fn result_attrib_mut (&mut self) -> &mut Value { + fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } diff --git a/proxmox-rest-server/src/file_logger.rs b/proxmox-rest-server/src/file_logger.rs index 7d4d3f86..eeabadab 100644 --- a/proxmox-rest-server/src/file_logger.rs +++ b/proxmox-rest-server/src/file_logger.rs @@ -3,7 +3,7 @@ use std::io::Write; use anyhow::Error; use nix::fcntl::OFlag; -use proxmox_sys::fs::{CreateOptions, atomic_open_or_create_file}; +use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions}; /// Options to control the behavior of a [FileLogger] instance #[derive(Default)] @@ -23,7 +23,6 @@ pub struct FileLogOptions { pub prefix_time: bool, /// File owner/group and mode pub file_opts: CreateOptions, - } /// Log messages with optional automatically added timestamps into files @@ -66,7 +65,11 @@ impl FileLogger { let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf(); - Ok(Self { file, file_name, options }) + Ok(Self { + file, + file_name, + options, + }) } pub fn reopen(&mut self) -> Result<&Self, Error> { @@ -79,23 +82,23 @@ impl FileLogger { file_name: P, options: &FileLogOptions, ) -> Result { - let mut flags = OFlag::O_CLOEXEC; - if options.read { - flags |= OFlag::O_RDWR; + if options.read { + flags |= OFlag::O_RDWR; } else { - flags |= OFlag::O_WRONLY; + flags |= OFlag::O_WRONLY; } if options.append { - flags |= OFlag::O_APPEND; + flags |= OFlag::O_APPEND; } if options.exclusive { - flags |= OFlag::O_EXCL; + flags |= OFlag::O_EXCL; } - let file = atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?; + let file = + atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?; Ok(file) } diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs index e3958826..e31df5b8 100644 --- a/proxmox-rest-server/src/formatter.rs +++ b/proxmox-rest-server/src/formatter.rs @@ -1,11 +1,11 @@ //! Helpers to format response data use std::collections::HashMap; -use anyhow::{Error}; +use anyhow::Error; use serde_json::{json, Value}; -use hyper::{Body, Response, StatusCode}; use hyper::header; +use hyper::{Body, Response, StatusCode}; use proxmox_router::{HttpError, RpcEnvironment}; use proxmox_schema::ParameterError; @@ -22,7 +22,11 @@ pub trait OutputFormatter: Send + Sync { fn format_error(&self, err: Error) -> Response; /// Transform a [Result] into a http response - fn format_result(&self, result: Result, rpcenv: &dyn RpcEnvironment) -> Response { + fn format_result( + &self, + result: Result, + rpcenv: &dyn RpcEnvironment, + ) -> Response { match result { Ok(data) => self.format_data(data, rpcenv), Err(err) => self.format_error(err), @@ -33,7 +37,6 @@ pub trait OutputFormatter: Send + Sync { static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8"; fn json_data_response(data: Value) -> Response { - let json_str = data.to_string(); let raw = json_str.into_bytes(); @@ -41,13 +44,13 @@ fn json_data_response(data: Value) -> Response { let mut response = Response::new(raw.into()); response.headers_mut().insert( header::CONTENT_TYPE, - header::HeaderValue::from_static(JSON_CONTENT_TYPE)); + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ); response } -fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) -{ +fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { let attributes = match rpcenv.result_attrib().as_object() { Some(attr) => attr, None => return, @@ -58,7 +61,6 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) } } - struct JsonFormatter(); /// Format data as ``application/json`` @@ -73,13 +75,9 @@ struct JsonFormatter(); /// message as string. pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter(); -impl OutputFormatter for JsonFormatter { - +impl OutputFormatter for JsonFormatter { fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { - - let mut result = json!({ - "data": data - }); + let mut result = json!({ "data": data }); add_result_attributes(&mut result, rpcenv); @@ -87,7 +85,6 @@ impl OutputFormatter for JsonFormatter { } fn format_error(&self, err: Error) -> Response { - let mut response = if let Some(apierr) = err.downcast_ref::() { let mut resp = Response::new(Body::from(apierr.message.clone())); *resp.status_mut() = apierr.code; @@ -100,9 +97,12 @@ impl OutputFormatter for JsonFormatter { response.headers_mut().insert( header::CONTENT_TYPE, - header::HeaderValue::from_static(JSON_CONTENT_TYPE)); + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ); - response.extensions_mut().insert(ErrorMessageExtension(err.to_string())); + response + .extensions_mut() + .insert(ErrorMessageExtension(err.to_string())); response } @@ -128,10 +128,8 @@ pub static EXTJS_FORMATTER: &'static dyn OutputFormatter = &ExtJsFormatter(); struct ExtJsFormatter(); -impl OutputFormatter for ExtJsFormatter { - +impl OutputFormatter for ExtJsFormatter { fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { - let mut result = json!({ "data": data, "success": true @@ -143,7 +141,6 @@ impl OutputFormatter for ExtJsFormatter { } fn format_error(&self, err: Error) -> Response { - let message: String; let mut errors = HashMap::new(); @@ -165,7 +162,9 @@ impl OutputFormatter for ExtJsFormatter { let mut response = json_data_response(result); - response.extensions_mut().insert(ErrorMessageExtension(message)); + response + .extensions_mut() + .insert(ErrorMessageExtension(message)); response } diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs index f5fcdeea..3f90c178 100644 --- a/proxmox-rest-server/src/h2service.rs +++ b/proxmox-rest-server/src/h2service.rs @@ -1,4 +1,4 @@ -use anyhow::{Error}; +use anyhow::Error; use std::collections::HashMap; use std::pin::Pin; @@ -8,11 +8,11 @@ use std::task::{Context, Poll}; use futures::*; use hyper::{Body, Request, Response, StatusCode}; -use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment}; use proxmox_router::http_err; +use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment}; -use crate::{normalize_uri_path, WorkerTask}; use crate::formatter::*; +use crate::{normalize_uri_path, WorkerTask}; /// Hyper Service implementation to handle stateful H2 connections. /// @@ -26,24 +26,29 @@ pub struct H2Service { debug: bool, } -impl H2Service { - +impl H2Service { pub fn new(rpcenv: E, worker: Arc, router: &'static Router, debug: bool) -> Self { - Self { rpcenv, worker, router, debug } + Self { + rpcenv, + worker, + router, + debug, + } } pub fn debug>(&self, msg: S) { - if self.debug { self.worker.log_message(msg); } + if self.debug { + self.worker.log_message(msg); + } } fn handle_request(&self, req: Request) -> ApiResponseFuture { - let (parts, body) = req.into_parts(); let method = parts.method.clone(); let (path, components) = match normalize_uri_path(parts.uri.path()) { - Ok((p,c)) => (p, c), + Ok((p, c)) => (p, c), Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(), }; @@ -58,15 +63,24 @@ impl H2Service { let err = http_err!(NOT_FOUND, "Path '{}' not found.", path); future::ok(formatter.format_error(err)).boxed() } - Some(api_method) => { - crate::rest::handle_api_request( - self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed() - } + Some(api_method) => crate::rest::handle_api_request( + self.rpcenv.clone(), + api_method, + formatter, + parts, + body, + uri_param, + ) + .boxed(), } } - fn log_response(worker: Arc, method: hyper::Method, path: &str, resp: &Response) { - + fn log_response( + worker: Arc, + method: hyper::Method, + path: &str, + resp: &Response, + ) { let status = resp.status(); if !status.is_success() { @@ -89,7 +103,7 @@ impl H2Service { } } -impl tower_service::Service> for H2Service { +impl tower_service::Service> for H2Service { type Response = Response; type Error = Error; #[allow(clippy::type_complexity)] @@ -111,15 +125,17 @@ impl tower_service::Service> for H2Ser Ok::<_, Error>(res) } Err(err) => { - if let Some(apierr) = err.downcast_ref::() { + if let Some(apierr) = err.downcast_ref::() { let mut resp = Response::new(Body::from(apierr.message.clone())); - resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone())); + resp.extensions_mut() + .insert(ErrorMessageExtension(apierr.message.clone())); *resp.status_mut() = apierr.code; Self::log_response(worker, method, &path, &resp); Ok(resp) } else { let mut resp = Response::new(Body::from(err.to_string())); - resp.extensions_mut().insert(ErrorMessageExtension(err.to_string())); + resp.extensions_mut() + .insert(ErrorMessageExtension(err.to_string())); *resp.status_mut() = StatusCode::BAD_REQUEST; Self::log_response(worker, method, &path, &resp); Ok(resp) diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs index a20d9070..dc538a80 100644 --- a/proxmox-rest-server/src/lib.rs +++ b/proxmox-rest-server/src/lib.rs @@ -15,20 +15,20 @@ //! - worker task management //! * generic interface to authenticate user -use std::sync::atomic::{Ordering, AtomicBool}; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use anyhow::{bail, format_err, Error}; -use nix::unistd::Pid; -use hyper::{Body, Response, Method}; use http::request::Parts; use http::HeaderMap; +use hyper::{Body, Method, Response}; +use nix::unistd::Pid; -use proxmox_sys::fd::Fd; -use proxmox_sys::linux::procfs::PidStat; -use proxmox_sys::fs::CreateOptions; use proxmox_router::UserInformation; +use proxmox_sys::fd::Fd; +use proxmox_sys::fs::CreateOptions; +use proxmox_sys::linux::procfs::PidStat; mod compression; pub use compression::*; @@ -47,7 +47,7 @@ mod command_socket; pub use command_socket::*; mod file_logger; -pub use file_logger::{FileLogger, FileLogOptions}; +pub use file_logger::{FileLogOptions, FileLogger}; mod api_config; pub use api_config::ApiConfig; @@ -75,7 +75,6 @@ impl From for AuthError { /// User Authentication and index/root page generation methods pub trait ServerAdapter: Send + Sync { - /// Returns the index/root page fn get_index( &self, @@ -91,11 +90,16 @@ pub trait ServerAdapter: Send + Sync { &'a self, headers: &'a HeaderMap, method: &'a Method, - ) -> Pin), AuthError>> + Send + 'a>>; - + ) -> Pin< + Box< + dyn Future), AuthError>> + + Send + + 'a, + >, + >; } -lazy_static::lazy_static!{ +lazy_static::lazy_static! { static ref PID: i32 = unsafe { libc::getpid() }; static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; } @@ -124,7 +128,8 @@ pub fn write_pid(pid_fn: &str) -> Result<(), Error> { pub fn read_pid(pid_fn: &str) -> Result { let pid = proxmox_sys::fs::file_get_contents(pid_fn)?; let pid = std::str::from_utf8(&pid)?.trim(); - pid.parse().map_err(|err| format_err!("could not parse pid - {}", err)) + pid.parse() + .map_err(|err| format_err!("could not parse pid - {}", err)) } /// Returns the control socket path for a specific process ID. @@ -178,7 +183,6 @@ pub fn socketpair() -> Result<(Fd, Fd), Error> { Ok((Fd(pa), Fd(pb))) } - /// Extract a specific cookie from cookie header. /// We assume cookie_name is already url encoded. pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option { diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs index 3343d5d6..2aadf1ed 100644 --- a/proxmox-rest-server/src/rest.rs +++ b/proxmox-rest-server/src/rest.rs @@ -18,24 +18,24 @@ use regex::Regex; use serde_json::Value; use tokio::fs::File; use tokio::time::Instant; -use url::form_urlencoded; use tower_service::Service; +use url::form_urlencoded; +use proxmox_router::http_err; use proxmox_router::{ check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, RpcEnvironmentType, UserInformation, }; -use proxmox_router::http_err; use proxmox_schema::{ObjectSchemaType, ParameterSchema}; use proxmox_http::client::RateLimitedStream; -use proxmox_compression::{DeflateEncoder, Level}; use proxmox_async::stream::AsyncReaderStream; +use proxmox_compression::{DeflateEncoder, Level}; use crate::{ - ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod, - normalize_uri_path, formatter::*, + formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger, + RestEnvironment, }; extern "C" { @@ -47,9 +47,15 @@ struct AuthStringExtension(String); struct EmptyUserInformation {} impl UserInformation for EmptyUserInformation { - fn is_superuser(&self, _userid: &str) -> bool { false } - fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false } - fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 } + fn is_superuser(&self, _userid: &str) -> bool { + false + } + fn is_group_member(&self, _userid: &str, _group: &str) -> bool { + false + } + fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { + 0 + } } /// REST server implementation (configured with [ApiConfig]) @@ -98,9 +104,7 @@ impl Service<&Pin>>> - for RestServer -{ +impl Service<&Pin>>> for RestServer { type Response = ApiService; type Error = Error; type Future = Pin> + Send>>; @@ -134,7 +138,7 @@ impl Service<&hyper::server::conn::AddrStream> for RestServer { } fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future { - let peer = ctx.remote_addr(); + let peer = ctx.remote_addr(); future::ok(ApiService { peer, api_config: self.api_config.clone(), @@ -494,7 +498,6 @@ pub(crate) async fn handle_api_request (&'static str, bool) { if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) { return match ext { @@ -671,7 +674,8 @@ async fn handle_request( } } - let mut user_info: Box = Box::new(EmptyUserInformation {}); + let mut user_info: Box = + Box::new(EmptyUserInformation {}); if auth_required { match api.check_auth(&parts.headers, &method).await { @@ -730,7 +734,9 @@ async fn handle_request( }; if let Some(auth_id) = auth_id { - response.extensions_mut().insert(AuthStringExtension(auth_id)); + response + .extensions_mut() + .insert(AuthStringExtension(auth_id)); } return Ok(response); diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs index e4234c76..c3b81627 100644 --- a/proxmox-rest-server/src/state.rs +++ b/proxmox-rest-server/src/state.rs @@ -1,4 +1,4 @@ -use anyhow::{Error}; +use anyhow::Error; use lazy_static::lazy_static; use std::sync::Mutex; @@ -40,7 +40,6 @@ lazy_static! { /// /// This calls [request_shutdown] when receiving the signal. pub fn catch_shutdown_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::interrupt())?; let future = async move { @@ -49,7 +48,8 @@ pub fn catch_shutdown_signal() -> Result<(), Error> { SERVER_STATE.lock().unwrap().reload_request = false; request_shutdown(); } - }.boxed(); + } + .boxed(); let abort_future = last_worker_future().map_err(|_| {}); let task = futures::future::select(future, abort_future); @@ -64,7 +64,6 @@ pub fn catch_shutdown_signal() -> Result<(), Error> { /// This calls [request_shutdown] when receiving the signal, and tries /// to restart the server. pub fn catch_reload_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::hangup())?; let future = async move { @@ -73,7 +72,8 @@ pub fn catch_reload_signal() -> Result<(), Error> { SERVER_STATE.lock().unwrap().reload_request = true; crate::request_shutdown(); } - }.boxed(); + } + .boxed(); let abort_future = last_worker_future().map_err(|_| {}); let task = futures::future::select(future, abort_future); @@ -89,7 +89,6 @@ pub(crate) fn is_reload_request() -> bool { data.mode == ServerMode::Shutdown && data.reload_request } - pub(crate) fn server_shutdown() { let mut data = SERVER_STATE.lock().unwrap(); @@ -107,14 +106,11 @@ pub(crate) fn server_shutdown() { /// Future to signal server shutdown pub fn shutdown_future() -> impl Future { let mut data = SERVER_STATE.lock().unwrap(); - data - .shutdown_listeners - .listen() - .map(|_| ()) + data.shutdown_listeners.listen().map(|_| ()) } /// Future to signal when last worker task finished -pub fn last_worker_future() -> impl Future> { +pub fn last_worker_future() -> impl Future> { let mut data = SERVER_STATE.lock().unwrap(); data.last_worker_listeners.listen() } @@ -128,7 +124,12 @@ pub(crate) fn set_worker_count(count: usize) { pub(crate) fn check_last_worker() { let mut data = SERVER_STATE.lock().unwrap(); - if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; } + if !(data.mode == ServerMode::Shutdown + && data.worker_count == 0 + && data.internal_task_count == 0) + { + return; + } data.last_worker_listeners.notify_listeners(Ok(())); } @@ -147,7 +148,8 @@ where tokio::spawn(async move { let _ = tokio::spawn(task).await; // ignore errors - { // drop mutex + { + // drop mutex let mut data = SERVER_STATE.lock().unwrap(); if data.internal_task_count > 0 { data.internal_task_count -= 1; diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs index 643e1872..f34cd1fc 100644 --- a/proxmox-rest-server/src/worker_task.rs +++ b/proxmox-rest-server/src/worker_task.rs @@ -1,30 +1,30 @@ use std::collections::{HashMap, VecDeque}; use std::fs::File; -use std::path::PathBuf; -use std::io::{Read, Write, BufRead, BufReader}; +use std::io::{BufRead, BufReader, Read, Write}; use std::panic::UnwindSafe; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, Duration}; +use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Error}; use futures::*; use lazy_static::lazy_static; -use serde_json::{json, Value}; -use serde::{Serialize, Deserialize}; -use tokio::sync::oneshot; use nix::fcntl::OFlag; use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::oneshot; -use proxmox_sys::linux::procfs; -use proxmox_sys::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; use proxmox_lang::try_block; use proxmox_schema::upid::UPID; +use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions}; +use proxmox_sys::linux::procfs; -use proxmox_sys::WorkerTaskContext; use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; +use proxmox_sys::WorkerTaskContext; -use crate::{CommandSocket, FileLogger, FileLogOptions}; +use crate::{CommandSocket, FileLogOptions, FileLogger}; struct TaskListLockGuard(File); @@ -40,14 +40,13 @@ struct WorkerTaskSetup { static WORKER_TASK_SETUP: OnceCell = OnceCell::new(); fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { - WORKER_TASK_SETUP.get() + WORKER_TASK_SETUP + .get() .ok_or_else(|| format_err!("WorkerTask library is not initialized")) } impl WorkerTaskSetup { - fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { - let mut taskdir = basedir; taskdir.push("tasks"); @@ -74,17 +73,15 @@ impl WorkerTaskSetup { } fn lock_task_list_files(&self, exclusive: bool) -> Result { - let options = self.file_opts.clone() + let options = self + .file_opts + .clone() .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); let timeout = std::time::Duration::new(10, 0); - let file = proxmox_sys::fs::open_file_locked( - &self.task_lock_fn, - timeout, - exclusive, - options, - )?; + let file = + proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?; Ok(TaskListLockGuard(file)) } @@ -99,7 +96,6 @@ impl WorkerTaskSetup { // atomically read/update the task list, update status of finished tasks // new_upid is added to the list when specified. fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { - let lock = self.lock_task_list_files(true)?; // TODO remove with 1.x @@ -121,45 +117,48 @@ impl WorkerTaskSetup { if !worker_is_active_local(&info.upid) { // println!("Detected stopped task '{}'", &info.upid_str); let now = proxmox_time::epoch_i64(); - let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); + let status = + upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); finish_list.push(TaskListInfo { upid: info.upid, upid_str: info.upid_str, - state: Some(status) + state: Some(status), }); return None; } Some(info) - }).collect(); + }) + .collect(); if let Some(upid) = new_upid { - active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); + active_list.push(TaskListInfo { + upid: upid.clone(), + upid_str: upid.to_string(), + state: None, + }); } let active_raw = render_task_list(&active_list); - let options = self.file_opts.clone() + let options = self + .file_opts + .clone() .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); - replace_file( - &self.active_tasks_fn, - active_raw.as_bytes(), - options, - false, - )?; + replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?; - finish_list.sort_unstable_by(|a, b| { - match (&a.state, &b.state) { - (Some(s1), Some(s2)) => s1.cmp(s2), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - _ => a.upid.starttime.cmp(&b.upid.starttime), - } + finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) { + (Some(s1), Some(s2)) => s1.cmp(s2), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + _ => a.upid.starttime.cmp(&b.upid.starttime), }); if !finish_list.is_empty() { - let options = self.file_opts.clone() + let options = self + .file_opts + .clone() .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); let mut writer = atomic_open_or_create_file( @@ -187,15 +186,17 @@ impl WorkerTaskSetup { // Create task log directory with correct permissions fn create_task_log_dirs(&self) -> Result<(), Error> { - try_block!({ - let dir_opts = self.file_opts.clone() + let dir_opts = self + .file_opts + .clone() .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?; // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; Ok(()) - }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) + }) + .map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) } } @@ -203,7 +204,8 @@ impl WorkerTaskSetup { pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { let setup = WorkerTaskSetup::new(basedir, file_opts); setup.create_task_log_dirs()?; - WORKER_TASK_SETUP.set(setup) + WORKER_TASK_SETUP + .set(setup) .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) } @@ -215,17 +217,11 @@ pub fn rotate_task_log_archive( max_files: Option, options: Option, ) -> Result { - let setup = worker_task_setup()?; let _lock = setup.lock_task_list_files(true)?; - let mut logrotate = LogRotate::new( - &setup.task_archive_fn, - compress, - max_files, - options, - )?; + let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress, max_files, options)?; logrotate.rotate(size_threshold) } @@ -237,12 +233,7 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { let _lock = setup.lock_task_list_files(true)?; - let logrotate = LogRotate::new( - &setup.task_archive_fn, - compressed, - None, - None, - )?; + let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?; let mut timestamp = None; if let Some(last_file) = logrotate.files().last() { @@ -265,7 +256,8 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) } else { SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) - }.ok_or_else(|| format_err!("could not calculate cutoff time"))?; + } + .ok_or_else(|| format_err!("could not calculate cutoff time"))?; for i in 0..256 { let mut path = setup.taskdir.clone(); @@ -279,8 +271,8 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { if modified < cutoff_time { match std::fs::remove_file(path) { - Ok(()) => {}, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}, + Ok(()) => {} + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} Err(err) => bail!("could not remove file: {}", err), } } @@ -291,7 +283,6 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { Ok(()) } - /// Path to the worker log file pub fn upid_log_path(upid: &UPID) -> Result { let setup = worker_task_setup()?; @@ -302,10 +293,11 @@ pub fn upid_log_path(upid: &UPID) -> Result { /// If there is not a single line with at valid datetime, we assume the /// starttime to be the endtime pub fn upid_read_status(upid: &UPID) -> Result { - let setup = worker_task_setup()?; - let mut status = TaskState::Unknown { endtime: upid.starttime }; + let mut status = TaskState::Unknown { + endtime: upid.starttime, + }; let path = setup.log_path(upid); @@ -325,7 +317,7 @@ pub fn upid_read_status(upid: &UPID) -> Result { } let last_line = match data.iter().rposition(|c| *c == b'\n') { - Some(start) if data.len() > (start+1) => &data[start+1..], + Some(start) if data.len() > (start + 1) => &data[start + 1..], Some(_) => &data, // should not happen, since we removed all trailing newlines None => &data, }; @@ -350,7 +342,8 @@ pub fn upid_read_status(upid: &UPID) -> Result { } lazy_static! { - static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); + static ref WORKER_TASK_LIST: Mutex>> = + Mutex::new(HashMap::new()); } /// checks if the task UPID refers to a worker from this process @@ -405,11 +398,13 @@ pub fn worker_is_active_local(upid: &UPID) -> bool { /// /// * ``worker-task-status ``: return true of false, depending on /// whether the worker is running or stopped. -pub fn register_task_control_commands( - commando_sock: &mut CommandSocket, -) -> Result<(), Error> { +pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> { fn get_upid(args: Option<&Value>) -> Result { - let args = if let Some(args) = args { args } else { bail!("missing args") }; + let args = if let Some(args) = args { + args + } else { + bail!("missing args") + }; let upid = match args.get("upid") { Some(Value::String(upid)) => upid.parse::()?, None => bail!("no upid in args"), @@ -454,7 +449,6 @@ pub fn abort_worker_nowait(upid: UPID) { /// /// By sending ``worker-task-abort`` to the control socket. pub async fn abort_worker(upid: UPID) -> Result<(), Error> { - let sock = crate::ctrl_sock_from_pid(upid.pid); let cmd = json!({ "command": "worker-task-abort", @@ -466,7 +460,6 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> { } fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { - let data = line.splitn(3, ' ').collect::>(); let len = data.len(); @@ -519,10 +512,15 @@ impl TaskState { Ok(TaskState::OK { endtime }) } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { let count: u64 = warnings.parse()?; - Ok(TaskState::Warning{ count, endtime }) + Ok(TaskState::Warning { count, endtime }) } else if !s.is_empty() { - let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); - Ok(TaskState::Error{ message, endtime }) + let message = if let Some(err) = s.strip_prefix("ERROR: ") { + err + } else { + s + } + .to_string(); + Ok(TaskState::Error { message, endtime }) } else { bail!("unable to parse Task Status '{}'", s); } @@ -545,7 +543,7 @@ impl std::fmt::Display for TaskState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { TaskState::Unknown { .. } => write!(f, "unknown"), - TaskState::OK { .. }=> write!(f, "OK"), + TaskState::OK { .. } => write!(f, "OK"), TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), TaskState::Error { message, .. } => write!(f, "{}", message), } @@ -568,7 +566,12 @@ pub struct TaskListInfo { fn render_task_line(info: &TaskListInfo) -> String { let mut raw = String::new(); if let Some(status) = &info.state { - raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); + raw.push_str(&format!( + "{} {:08X} {}\n", + info.upid_str, + status.endtime(), + status + )); } else { raw.push_str(&info.upid_str); raw.push('\n'); @@ -587,8 +590,7 @@ fn render_task_list(list: &[TaskListInfo]) -> String { // note this is not locked, caller has to make sure it is // this will skip (and log) lines that are not valid status lines -fn read_task_file(reader: R) -> Result, Error> -{ +fn read_task_file(reader: R) -> Result, Error> { let reader = BufReader::new(reader); let mut list = Vec::new(); for line in reader.lines() { @@ -597,7 +599,7 @@ fn read_task_file(reader: R) -> Result, Error> Ok((upid_str, upid, state)) => list.push(TaskListInfo { upid_str, upid, - state + state, }), Err(err) => { log::warn!("unable to parse worker status '{}' - {}", line, err); @@ -634,7 +636,6 @@ pub struct TaskListInfoIterator { impl TaskListInfoIterator { /// Creates a new iterator instance. pub fn new(active_only: bool) -> Result { - let setup = worker_task_setup()?; let (read_lock, active_list) = { @@ -685,7 +686,7 @@ impl Iterator for TaskListInfoIterator { if let Some(element) = self.list.pop_back() { return Some(Ok(element)); } else if self.end { - return None; + return None; } else { if let Some(mut archive) = self.archive.take() { if let Some(file) = archive.next() { @@ -720,7 +721,6 @@ pub struct WorkerTask { } impl std::fmt::Display for WorkerTask { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.upid.fmt(f) } @@ -734,14 +734,12 @@ struct WorkerTaskData { } impl WorkerTask { - pub fn new( worker_type: &str, worker_id: Option, auth_id: String, to_stdout: bool, ) -> Result, Error> { - let setup = worker_task_setup()?; let upid = UPID::new(worker_type, worker_id, auth_id)?; @@ -751,7 +749,9 @@ impl WorkerTask { path.push(format!("{:02X}", upid.pstart & 255)); - let dir_opts = setup.file_opts.clone() + let dir_opts = setup + .file_opts + .clone() .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); create_path(&path, None, Some(dir_opts))?; @@ -800,8 +800,9 @@ impl WorkerTask { to_stdout: bool, f: F, ) -> Result - where F: Send + 'static + FnOnce(Arc) -> T, - T: Send + 'static + Future>, + where + F: Send + 'static + FnOnce(Arc) -> T, + T: Send + 'static + Future>, { let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; let upid_str = worker.upid.to_string(); @@ -822,29 +823,26 @@ impl WorkerTask { to_stdout: bool, f: F, ) -> Result - where F: Send + UnwindSafe + 'static + FnOnce(Arc) -> Result<(), Error> + where + F: Send + UnwindSafe + 'static + FnOnce(Arc) -> Result<(), Error>, { let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; let upid_str = worker.upid.to_string(); - let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { - let worker1 = worker.clone(); - let result = match std::panic::catch_unwind(move || f(worker1)) { - Ok(r) => r, - Err(panic) => { - match panic.downcast::<&str>() { - Ok(panic_msg) => { - Err(format_err!("worker panicked: {}", panic_msg)) - } - Err(_) => { - Err(format_err!("worker panicked: unknown type.")) - } - } - } - }; + let _child = std::thread::Builder::new() + .name(upid_str.clone()) + .spawn(move || { + let worker1 = worker.clone(); + let result = match std::panic::catch_unwind(move || f(worker1)) { + Ok(r) => r, + Err(panic) => match panic.downcast::<&str>() { + Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)), + Err(_) => Err(format_err!("worker panicked: unknown type.")), + }, + }; - worker.log_result(&result); - }); + worker.log_result(&result); + }); Ok(upid_str) } @@ -856,9 +854,15 @@ impl WorkerTask { let endtime = proxmox_time::epoch_i64(); if let Err(err) = result { - TaskState::Error { message: err.to_string(), endtime } + TaskState::Error { + message: err.to_string(), + endtime, + } } else if warn_count > 0 { - TaskState::Warning { count: warn_count, endtime } + TaskState::Warning { + count: warn_count, + endtime, + } } else { TaskState::OK { endtime } } @@ -893,30 +897,33 @@ impl WorkerTask { let mut data = self.data.lock().unwrap(); data.progress = progress; } else { - // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); + // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); } } /// Request abort pub fn request_abort(&self) { let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); - if !prev_abort { // log abort one time + if !prev_abort { + // log abort one time self.log_message("received abort request ...".to_string()); } // noitify listeners let mut data = self.data.lock().unwrap(); loop { match data.abort_listeners.pop() { - None => { break; }, + None => { + break; + } Some(ch) => { let _ = ch.send(()); // ignore errors here - }, + } } } } /// Get a future which resolves on task abort - pub fn abort_future(&self) -> oneshot::Receiver<()> { + pub fn abort_future(&self) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel::<()>(); let mut data = self.data.lock().unwrap(); @@ -934,7 +941,6 @@ impl WorkerTask { } impl WorkerTaskContext for WorkerTask { - fn abort_requested(&self) -> bool { self.abort_requested.load(Ordering::SeqCst) } @@ -963,7 +969,6 @@ impl WorkerTaskContext for WorkerTask { /// Note: local workers should print logs to stdout, so there is no /// need to fetch/display logs. We just wait for the worker to finish. pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { - let upid: UPID = upid_str.parse()?; let sleep_duration = core::time::Duration::new(0, 100_000_000);