diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml new file mode 100644 index 00000000..cfd1af9b --- /dev/null +++ b/proxmox-rest-server/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "proxmox-rest-server" +version = "0.1.0" +authors = ["Proxmox Support Team "] +edition = "2018" +description = "REST server implementation" + +# for example +[dev-dependencies] +proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] } +tokio = { version = "1.6", features = [ "rt-multi-thread", "signal", "process" ] } + +[dependencies] +anyhow = "1.0" +futures = "0.3" +handlebars = "3.0" +http = "0.2" +hyper = { version = "0.14.5", features = [ "full" ] } +lazy_static = "1.4" +libc = "0.2" +log = "0.4.17" +nix = "0.24" +once_cell = "1.3.1" +percent-encoding = "2.1" +regex = "1.5" +serde = { version = "1.0", features = [ "derive" ] } +serde_json = "1.0" +tokio = { version = "1.6", features = ["signal", "process"] } +tokio-openssl = "0.6.1" +tokio-stream = "0.1.0" +tower-service = "0.3.0" +url = "2.1" + +#proxmox = "0.15.3" +proxmox-async = "0.4" +proxmox-compression = "0.1.1" +proxmox-io = "1" +proxmox-lang = "1.1" +proxmox-http = { version = "0.7", features = [ "client" ] } +proxmox-router = "1.3.0" +proxmox-schema = { version = "1.3.1", features = [ "api-macro", "upid-api-impl" ] } +proxmox-time = "1" +proxmox-sys = { version = "0.4", features = [ "logrotate", "timer" ] } diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs new file mode 100644 index 00000000..91cb9738 --- /dev/null +++ b/proxmox-rest-server/examples/minimal-rest-server.rs @@ -0,0 +1,232 @@ +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Mutex; + +use anyhow::{bail, format_err, Error}; +use http::request::Parts; +use http::HeaderMap; +use hyper::{Body, Method, Response}; +use lazy_static::lazy_static; + +use proxmox_router::{ + list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation, +}; +use proxmox_schema::api; + +use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter}; + +// Create a Dummy User information system +struct DummyUserInfo; + +impl UserInformation for DummyUserInfo { + fn is_superuser(&self, _userid: &str) -> bool { + // Always return true here, so we have access to everything + true + } + fn is_group_member(&self, _userid: &str, group: &str) -> bool { + group == "Group" + } + fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { + u64::MAX + } +} + +struct MinimalServer; + +// implement the server adapter +impl ServerAdapter for MinimalServer { + // normally this would check and authenticate the user + fn check_auth( + &self, + _headers: &HeaderMap, + _method: &Method, + ) -> Pin< + Box< + dyn Future), AuthError>> + + Send, + >, + > { + Box::pin(async move { + // get some global/cached userinfo + let userinfo: Box = Box::new(DummyUserInfo); + // Do some user checks, e.g. cookie/csrf + Ok(("User".to_string(), userinfo)) + }) + } + + // this should return the index page of the webserver, iow. what the user browses to + fn get_index( + &self, + _env: RestEnvironment, + _parts: Parts, + ) -> Pin> + Send>> { + Box::pin(async move { + // build an index page + http::Response::builder() + .body("hello world".into()) + .unwrap() + }) + } +} + +// a few examples on how to do api calls with the Router + +#[api] +/// A simple ping method. returns "pong" +fn ping() -> Result { + Ok("pong".to_string()) +} + +lazy_static! { + static ref ITEM_MAP: Mutex> = Mutex::new(HashMap::new()); +} + +#[api] +/// Lists all current items +fn list_items() -> Result, Error> { + Ok(ITEM_MAP.lock().unwrap().keys().cloned().collect()) +} + +#[api( + input: { + properties: { + name: { + type: String, + description: "The name", + }, + value: { + type: String, + description: "The value", + }, + }, + }, +)] +/// creates a new item +fn create_item(name: String, value: String) -> Result<(), Error> { + let mut map = ITEM_MAP.lock().unwrap(); + if map.contains_key(&name) { + bail!("{} already exists", name); + } + + map.insert(name, value); + + Ok(()) +} + +#[api( + input: { + properties: { + name: { + type: String, + description: "The name", + }, + }, + }, +)] +/// returns the value of an item +fn get_item(name: String) -> Result { + ITEM_MAP + .lock() + .unwrap() + .get(&name) + .map(|s| s.to_string()) + .ok_or_else(|| format_err!("no such item '{}'", name)) +} + +#[api( + input: { + properties: { + name: { + type: String, + description: "The name", + }, + value: { + type: String, + description: "The value", + }, + }, + }, +)] +/// updates an item +fn update_item(name: String, value: String) -> Result<(), Error> { + if let Some(val) = ITEM_MAP.lock().unwrap().get_mut(&name) { + *val = value; + } else { + bail!("no such item '{}'", name); + } + Ok(()) +} + +#[api( + input: { + properties: { + name: { + type: String, + description: "The name", + }, + }, + }, +)] +/// deletes an item +fn delete_item(name: String) -> Result<(), Error> { + if ITEM_MAP.lock().unwrap().remove(&name).is_none() { + bail!("no such item '{}'", name); + } + Ok(()) +} + +const ITEM_ROUTER: Router = Router::new() + .get(&API_METHOD_GET_ITEM) + .put(&API_METHOD_UPDATE_ITEM) + .delete(&API_METHOD_DELETE_ITEM); + +const SUBDIRS: SubdirMap = &[ + ( + "items", + &Router::new() + .get(&API_METHOD_LIST_ITEMS) + .post(&API_METHOD_CREATE_ITEM) + .match_all("name", &ITEM_ROUTER), + ), + ("ping", &Router::new().get(&API_METHOD_PING)), +]; + +const ROUTER: Router = Router::new() + .get(&list_subdirs_api_method!(SUBDIRS)) + .subdirs(SUBDIRS); + +async fn run() -> Result<(), Error> { + // we first have to configure the api environment (basedir etc.) + + let config = ApiConfig::new( + "/var/tmp/", + &ROUTER, + RpcEnvironmentType::PUBLIC, + MinimalServer, + )?; + let rest_server = RestServer::new(config); + + // then we have to create a daemon that listens, accepts and serves the api to clients + proxmox_rest_server::daemon::create_daemon( + ([127, 0, 0, 1], 65000).into(), + move |listener| { + let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; + + Ok(async move { + hyper::Server::builder(incoming).serve(rest_server).await?; + + Ok(()) + }) + }, + None, + ) + .await?; + + Ok(()) +} + +fn main() -> Result<(), Error> { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { run().await }) +} diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs new file mode 100644 index 00000000..b05e06d0 --- /dev/null +++ b/proxmox-rest-server/src/api_config.rs @@ -0,0 +1,287 @@ +use std::collections::HashMap; +use std::fs::metadata; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::SystemTime; + +use anyhow::{bail, format_err, Error}; +use hyper::http::request::Parts; +use hyper::{Body, Method, Response}; + +use handlebars::Handlebars; +use serde::Serialize; + +use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation}; +use proxmox_sys::fs::{create_path, CreateOptions}; + +use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter}; + +/// REST server configuration +pub struct ApiConfig { + basedir: PathBuf, + router: &'static Router, + aliases: HashMap, + env_type: RpcEnvironmentType, + templates: RwLock>, + template_files: RwLock>, + request_log: Option>>, + auth_log: Option>>, + adapter: Pin>, +} + +impl ApiConfig { + /// Creates a new instance + /// + /// `basedir` - File lookups are relative to this directory. + /// + /// `router` - The REST API definition. + /// + /// `env_type` - The environment type. + /// + /// `api_auth` - The Authentication handler + /// + /// `get_index_fn` - callback to generate the root page + /// (index). Please note that this functions gets a reference to + /// the [ApiConfig], so it can use [Handlebars] templates + /// ([render_template](Self::render_template) to generate pages. + pub fn new>( + basedir: B, + router: &'static Router, + env_type: RpcEnvironmentType, + adapter: impl ServerAdapter + 'static, + ) -> Result { + Ok(Self { + basedir: basedir.into(), + router, + aliases: HashMap::new(), + env_type, + templates: RwLock::new(Handlebars::new()), + template_files: RwLock::new(HashMap::new()), + request_log: None, + auth_log: None, + adapter: Box::pin(adapter), + }) + } + + pub(crate) async fn get_index( + &self, + rest_env: RestEnvironment, + parts: Parts, + ) -> Response { + self.adapter.get_index(rest_env, parts).await + } + + pub(crate) async fn check_auth( + &self, + headers: &http::HeaderMap, + method: &hyper::Method, + ) -> Result<(String, Box), AuthError> { + self.adapter.check_auth(headers, method).await + } + + pub(crate) fn find_method( + &self, + components: &[&str], + method: Method, + uri_param: &mut HashMap, + ) -> Option<&'static ApiMethod> { + self.router.find_method(components, method, uri_param) + } + + pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf { + let mut prefix = String::new(); + let mut filename = self.basedir.clone(); + let comp_len = components.len(); + if comp_len >= 1 { + prefix.push_str(components[0]); + if let Some(subdir) = self.aliases.get(&prefix) { + filename.push(subdir); + components + .iter() + .skip(1) + .for_each(|comp| filename.push(comp)); + } else { + components.iter().for_each(|comp| filename.push(comp)); + } + } + filename + } + + /// Register a path alias + /// + /// This can be used to redirect file lookups to a specific + /// directory, e.g.: + /// + /// ``` + /// use proxmox_rest_server::ApiConfig; + /// // let mut config = ApiConfig::new(...); + /// # fn fake(config: &mut ApiConfig) { + /// config.add_alias("extjs", "/usr/share/javascript/extjs"); + /// # } + /// ``` + pub fn add_alias(&mut self, alias: S, path: P) + where + S: Into, + P: Into, + { + self.aliases.insert(alias.into(), path.into()); + } + + pub(crate) fn env_type(&self) -> RpcEnvironmentType { + self.env_type + } + + /// Register a [Handlebars] template file + /// + /// Those templates cane be use with [render_template](Self::render_template) to generate pages. + pub fn register_template

(&self, name: &str, path: P) -> Result<(), Error> + where + P: Into, + { + if self.template_files.read().unwrap().contains_key(name) { + bail!("template already registered"); + } + + let path: PathBuf = path.into(); + let metadata = metadata(&path)?; + let mtime = metadata.modified()?; + + self.templates + .write() + .unwrap() + .register_template_file(name, &path)?; + self.template_files + .write() + .unwrap() + .insert(name.to_string(), (mtime, path)); + + Ok(()) + } + + /// Checks if the template was modified since the last rendering + /// if yes, it loads a the new version of the template + pub fn render_template(&self, name: &str, data: &T) -> Result + where + T: Serialize, + { + let path; + let mtime; + { + let template_files = self.template_files.read().unwrap(); + let (old_mtime, old_path) = template_files + .get(name) + .ok_or_else(|| format_err!("template not found"))?; + + mtime = metadata(old_path)?.modified()?; + if mtime <= *old_mtime { + return self + .templates + .read() + .unwrap() + .render(name, data) + .map_err(|err| format_err!("{}", err)); + } + path = old_path.to_path_buf(); + } + + { + let mut template_files = self.template_files.write().unwrap(); + let mut templates = self.templates.write().unwrap(); + + templates.register_template_file(name, &path)?; + template_files.insert(name.to_string(), (mtime, path)); + + templates + .render(name, data) + .map_err(|err| format_err!("{}", err)) + } + } + + /// Enable the access log feature + /// + /// When enabled, all requests are logged to the specified file. + /// This function also registers a `api-access-log-reopen` + /// command one the [CommandSocket]. + pub fn enable_access_log

( + &mut self, + path: P, + dir_opts: Option, + file_opts: Option, + commando_sock: &mut CommandSocket, + ) -> Result<(), Error> + where + P: Into, + { + let path: PathBuf = path.into(); + if let Some(base) = path.parent() { + if !base.exists() { + create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?; + } + } + + let logger_options = FileLogOptions { + append: true, + file_opts: file_opts.unwrap_or_default(), + ..Default::default() + }; + let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?)); + self.request_log = Some(Arc::clone(&request_log)); + + commando_sock.register_command("api-access-log-reopen".into(), move |_args| { + log::info!("re-opening access-log file"); + request_log.lock().unwrap().reopen()?; + Ok(serde_json::Value::Null) + })?; + + Ok(()) + } + + /// Enable the authentication log feature + /// + /// When enabled, all authentication requests are logged to the + /// specified file. This function also registers a + /// `api-auth-log-reopen` command one the [CommandSocket]. + pub fn enable_auth_log

( + &mut self, + path: P, + dir_opts: Option, + file_opts: Option, + commando_sock: &mut CommandSocket, + ) -> Result<(), Error> + where + P: Into, + { + let path: PathBuf = path.into(); + if let Some(base) = path.parent() { + if !base.exists() { + create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?; + } + } + + let logger_options = FileLogOptions { + append: true, + prefix_time: true, + file_opts: file_opts.unwrap_or_default(), + ..Default::default() + }; + let auth_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?)); + self.auth_log = Some(Arc::clone(&auth_log)); + + commando_sock.register_command("api-auth-log-reopen".into(), move |_args| { + log::info!("re-opening auth-log file"); + auth_log.lock().unwrap().reopen()?; + Ok(serde_json::Value::Null) + })?; + + Ok(()) + } + + pub(crate) fn get_access_log(&self) -> Option<&Arc>> { + self.request_log.as_ref() + } + + pub(crate) fn get_auth_log(&self) -> Option<&Arc>> { + self.auth_log.as_ref() + } +} diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-rest-server/src/command_socket.rs new file mode 100644 index 00000000..bfa42b01 --- /dev/null +++ b/proxmox-rest-server/src/command_socket.rs @@ -0,0 +1,241 @@ +use anyhow::{bail, format_err, Error}; + +use std::collections::HashMap; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use futures::*; +use nix::sys::socket; +use nix::unistd::Gid; +use serde::Serialize; +use serde_json::Value; +use tokio::net::UnixListener; + +// Listens on a Unix Socket to handle simple command asynchronously +fn create_control_socket( + path: P, + gid: Gid, + func: F, +) -> Result, Error> +where + P: Into, + F: Fn(Value) -> Result + Send + Sync + 'static, +{ + let path: PathBuf = path.into(); + + let gid = gid.as_raw(); + + let socket = UnixListener::bind(&path)?; + + let func = Arc::new(func); + + let control_future = async move { + loop { + let (conn, _addr) = match socket.accept().await { + Ok(data) => data, + Err(err) => { + log::error!("failed to accept on control socket {:?}: {}", path, err); + continue; + } + }; + + let opt = socket::sockopt::PeerCredentials {}; + let cred = match socket::getsockopt(conn.as_raw_fd(), opt) { + Ok(cred) => cred, + Err(err) => { + log::error!("no permissions - unable to read peer credential - {}", err); + continue; + } + }; + + // check permissions (same gid, root user, or backup group) + let mygid = unsafe { libc::getgid() }; + if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) { + log::error!("no permissions for {:?}", cred); + continue; + } + + let (rx, mut tx) = tokio::io::split(conn); + + let abort_future = super::last_worker_future().map(|_| ()); + + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + let func = Arc::clone(&func); + let path = path.clone(); + tokio::spawn( + futures::future::select( + async move { + let mut rx = tokio::io::BufReader::new(rx); + let mut line = String::new(); + loop { + line.clear(); + match rx + .read_line({ + line.clear(); + &mut line + }) + .await + { + Ok(0) => break, + Ok(_) => (), + Err(err) => { + log::error!("control socket {:?} read error: {}", path, err); + return; + } + } + + let response = match line.parse::() { + Ok(param) => match func(param) { + Ok(res) => format!("OK: {}\n", res), + Err(err) => format!("ERROR: {}\n", err), + }, + Err(err) => format!("ERROR: {}\n", err), + }; + + if let Err(err) = tx.write_all(response.as_bytes()).await { + log::error!( + "control socket {:?} write response error: {}", + path, + err + ); + return; + } + } + } + .boxed(), + abort_future, + ) + .map(|_| ()), + ); + } + } + .boxed(); + + let abort_future = crate::last_worker_future().map_err(|_| {}); + let task = futures::future::select(control_future, abort_future) + .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); + + Ok(task) +} + +/// Send a command to the specified socket +pub async fn send_command(path: P, params: &T) -> Result +where + P: AsRef, + T: ?Sized + Serialize, +{ + let mut command_string = serde_json::to_string(params)?; + command_string.push('\n'); + send_raw_command(path.as_ref(), &command_string).await +} + +/// Send a raw command (string) to the specified socket +pub async fn send_raw_command

(path: P, command_string: &str) -> Result +where + P: AsRef, +{ + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + + let mut conn = tokio::net::UnixStream::connect(path) + .map_err(move |err| format_err!("control socket connect failed - {}", err)) + .await?; + + conn.write_all(command_string.as_bytes()).await?; + if !command_string.as_bytes().ends_with(b"\n") { + conn.write_all(b"\n").await?; + } + + AsyncWriteExt::shutdown(&mut conn).await?; + let mut rx = tokio::io::BufReader::new(conn); + let mut data = String::new(); + if rx.read_line(&mut data).await? == 0 { + bail!("no response"); + } + if let Some(res) = data.strip_prefix("OK: ") { + match res.parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if let Some(err) = data.strip_prefix("ERROR: ") { + bail!("{}", err); + } else { + bail!("unable to parse response: {}", data); + } +} + +// A callback for a specific commando socket. +type CommandSocketFn = + Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; + +/// Tooling to get a single control command socket where one can +/// register multiple commands dynamically. +/// +/// The socket is activated by calling [spawn](CommandSocket::spawn), +/// which spawns an async tokio task to process the commands. +pub struct CommandSocket { + socket: PathBuf, + gid: Gid, + commands: HashMap, +} + +impl CommandSocket { + /// Creates a new instance. + pub fn new

(path: P, gid: Gid) -> Self + where + P: Into, + { + CommandSocket { + socket: path.into(), + gid, + commands: HashMap::new(), + } + } + + /// Spawn the socket and consume self, meaning you cannot register commands anymore after + /// calling this. + pub fn spawn(self) -> Result<(), Error> { + let control_future = + create_control_socket(self.socket.to_owned(), self.gid, move |param| { + let param = param.as_object().ok_or_else(|| { + format_err!("unable to parse parameters (expected json object)") + })?; + + let command = match param.get("command") { + Some(Value::String(command)) => command.as_str(), + None => bail!("no command"), + _ => bail!("unable to parse command"), + }; + + if !self.commands.contains_key(command) { + bail!("got unknown command '{}'", command); + } + + match self.commands.get(command) { + None => bail!("got unknown command '{}'", command), + Some(handler) => { + let args = param.get("args"); //.unwrap_or(&Value::Null); + (handler)(args) + } + } + })?; + + tokio::spawn(control_future); + + Ok(()) + } + + /// Register a new command with a callback. + pub fn register_command(&mut self, command: String, handler: F) -> Result<(), Error> + where + F: Fn(Option<&Value>) -> Result + Send + Sync + 'static, + { + if self.commands.contains_key(&command) { + bail!("command '{}' already exists!", command); + } + + self.commands.insert(command, Box::new(handler)); + + Ok(()) + } +} diff --git a/proxmox-rest-server/src/compression.rs b/proxmox-rest-server/src/compression.rs new file mode 100644 index 00000000..189d7041 --- /dev/null +++ b/proxmox-rest-server/src/compression.rs @@ -0,0 +1,39 @@ +use anyhow::{bail, Error}; +use hyper::header; + +/// Possible Compression Methods, order determines preference (later is preferred) +#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] +pub enum CompressionMethod { + Deflate, + // Gzip, + // Brotli, +} + +impl CompressionMethod { + pub fn content_encoding(&self) -> header::HeaderValue { + header::HeaderValue::from_static(self.extension()) + } + + pub fn extension(&self) -> &'static str { + match *self { + // CompressionMethod::Brotli => "br", + // CompressionMethod::Gzip => "gzip", + CompressionMethod::Deflate => "deflate", + } + } +} + +impl std::str::FromStr for CompressionMethod { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + // "br" => Ok(CompressionMethod::Brotli), + // "gzip" => Ok(CompressionMethod::Gzip), + "deflate" => Ok(CompressionMethod::Deflate), + // http accept-encoding allows to give weights with ';q=' + other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate), + _ => bail!("unknown compression format"), + } + } +} diff --git a/proxmox-rest-server/src/daemon.rs b/proxmox-rest-server/src/daemon.rs new file mode 100644 index 00000000..4a5806bd --- /dev/null +++ b/proxmox-rest-server/src/daemon.rs @@ -0,0 +1,379 @@ +//! Helpers to implement restartable daemons/services. + +use std::ffi::CString; +use std::future::Future; +use std::io::{Read, Write}; +use std::os::raw::{c_char, c_int, c_uchar}; +use std::os::unix::ffi::OsStrExt; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::panic::UnwindSafe; +use std::path::PathBuf; + +use anyhow::{bail, format_err, Error}; +use futures::future::{self, Either}; +use nix::unistd::{fork, ForkResult}; + +use proxmox_io::{ReadExt, WriteExt}; +use proxmox_sys::fd::{fd_change_cloexec, Fd}; +use proxmox_sys::fs::CreateOptions; + +// Unfortunately FnBox is nightly-only and Box is unusable, so just use Box... +type BoxedStoreFunc = Box Result + UnwindSafe + Send>; + +// Helper trait to "store" something in the environment to be re-used after re-executing the +// service on a reload. +trait Reloadable: Sized { + fn restore(var: &str) -> Result; + fn get_store_func(&self) -> Result; +} + +// Manages things to be stored and reloaded upon reexec. +// Anything which should be restorable should be instantiated via this struct's `restore` method, +#[derive(Default)] +struct Reloader { + pre_exec: Vec, + self_exe: PathBuf, +} + +// Currently we only need environment variables for storage, but in theory we could also add +// variants which need temporary files or pipes... +struct PreExecEntry { + name: &'static str, // Feel free to change to String if necessary... + store_fn: BoxedStoreFunc, +} + +impl Reloader { + pub fn new() -> Result { + Ok(Self { + pre_exec: Vec::new(), + + // Get the path to our executable as PathBuf + self_exe: std::fs::read_link("/proc/self/exe")?, + }) + } + + /// Restore an object from an environment variable of the given name, or, if none exists, uses + /// the function provided in the `or_create` parameter to instantiate the new "first" instance. + /// + /// Values created via this method will be remembered for later re-execution. + pub async fn restore(&mut self, name: &'static str, or_create: F) -> Result + where + T: Reloadable, + F: FnOnce() -> U, + U: Future>, + { + let res = match std::env::var(name) { + Ok(varstr) => T::restore(&varstr)?, + Err(std::env::VarError::NotPresent) => or_create().await?, + Err(_) => bail!("variable {} has invalid value", name), + }; + + self.pre_exec.push(PreExecEntry { + name, + store_fn: res.get_store_func()?, + }); + Ok(res) + } + + fn pre_exec(self) -> Result<(), Error> { + for mut item in self.pre_exec { + std::env::set_var(item.name, (item.store_fn)()?); + } + Ok(()) + } + + pub fn fork_restart(self, pid_fn: Option<&str>) -> Result<(), Error> { + // Get our parameters as Vec + let args = std::env::args_os(); + let mut new_args = Vec::with_capacity(args.len()); + for arg in args { + new_args.push(CString::new(arg.as_bytes())?); + } + + // Synchronisation pipe: + let (pold, pnew) = super::socketpair()?; + + // Start ourselves in the background: + match unsafe { fork() } { + Ok(ForkResult::Child) => { + // Double fork so systemd can supervise us without nagging... + match unsafe { fork() } { + Ok(ForkResult::Child) => { + std::mem::drop(pold); + // At this point we call pre-exec helpers. We must be certain that if they fail for + // whatever reason we can still call `_exit()`, so use catch_unwind. + match std::panic::catch_unwind(move || { + let mut pnew = + unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) }; + let pid = nix::unistd::Pid::this(); + if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } { + log::error!("failed to send new server PID to parent: {}", e); + unsafe { + libc::_exit(-1); + } + } + + let mut ok = [0u8]; + if let Err(e) = pnew.read_exact(&mut ok) { + log::error!("parent vanished before notifying systemd: {}", e); + unsafe { + libc::_exit(-1); + } + } + assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte"); + + std::mem::drop(pnew); + + // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs + let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()) + .unwrap(); + let ident = ident.as_bytes(); + let fd = + unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }; + if fd >= 0 && fd != 1 { + let fd = proxmox_sys::fd::Fd(fd); // add drop handler + nix::unistd::dup2(fd.as_raw_fd(), 1)?; + } else { + log::error!("failed to update STDOUT journal redirection ({})", fd); + } + let fd = + unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }; + if fd >= 0 && fd != 2 { + let fd = proxmox_sys::fd::Fd(fd); // add drop handler + nix::unistd::dup2(fd.as_raw_fd(), 2)?; + } else { + log::error!("failed to update STDERR journal redirection ({})", fd); + } + + self.do_reexec(new_args) + }) { + Ok(Ok(())) => log::error!("do_reexec returned!"), + Ok(Err(err)) => log::error!("do_reexec failed: {}", err), + Err(_) => log::error!("panic in re-exec"), + } + } + Ok(ForkResult::Parent { child }) => { + std::mem::drop((pold, pnew)); + log::debug!("forked off a new server (second pid: {})", child); + } + Err(e) => log::error!("fork() failed, restart delayed: {}", e), + } + // No matter how we managed to get here, this is the time where we bail out quickly: + unsafe { libc::_exit(-1) } + } + Ok(ForkResult::Parent { child }) => { + log::debug!( + "forked off a new server (first pid: {}), waiting for 2nd pid", + child + ); + std::mem::drop(pnew); + let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) }; + let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } { + Ok(v) => v, + Err(e) => { + log::error!( + "failed to receive pid of double-forked child process: {}", + e + ); + // systemd will complain but won't kill the service... + return Ok(()); + } + }); + + if let Some(pid_fn) = pid_fn { + let pid_str = format!("{}\n", child); + proxmox_sys::fs::replace_file( + pid_fn, + pid_str.as_bytes(), + CreateOptions::new(), + false, + )?; + } + + if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) { + log::error!("failed to notify systemd about the new main pid: {}", e); + } + // ensure systemd got the message about the new main PID before continuing, else it + // will get confused if the new main process sends its READY signal before that + if let Err(e) = systemd_notify_barrier(u64::MAX) { + log::error!("failed to wait on systemd-processing: {}", e); + } + + // notify child that it is now the new main process: + if let Err(e) = pold.write_all(&[1u8]) { + log::error!("child vanished during reload: {}", e); + } + + Ok(()) + } + Err(e) => { + log::error!("fork() failed, restart delayed: {}", e); + Ok(()) + } + } + } + + fn do_reexec(self, args: Vec) -> Result<(), Error> { + let exe = CString::new(self.self_exe.as_os_str().as_bytes())?; + self.pre_exec()?; + nix::unistd::setsid()?; + let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect(); + nix::unistd::execvp(&exe, &args)?; + panic!("exec misbehaved"); + } +} + +// For now all we need to do is store and reuse a tcp listening socket: +impl Reloadable for tokio::net::TcpListener { + // NOTE: The socket must not be closed when the store-function is called: + // FIXME: We could become "independent" of the TcpListener and its reference to the file + // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?) + fn get_store_func(&self) -> Result { + let mut fd_opt = Some(Fd(nix::fcntl::fcntl( + self.as_raw_fd(), + nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), + )?)); + Ok(Box::new(move || { + let fd = fd_opt.take().unwrap(); + fd_change_cloexec(fd.as_raw_fd(), false)?; + Ok(fd.into_raw_fd().to_string()) + })) + } + + fn restore(var: &str) -> Result { + let fd = var + .parse::() + .map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd; + fd_change_cloexec(fd, true)?; + Ok(Self::from_std(unsafe { + std::net::TcpListener::from_raw_fd(fd) + })?) + } +} + +/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. +/// If this is started regularly, a listening socket is created. In this case, the file descriptor +/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. +/// If the variable already exists, its contents will instead be used to restore the listening +/// socket. The finished listening socket is then passed to the `create_service` function which +/// can be used to setup the TLS and the HTTP daemon. The returned future has to call +/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready. +pub async fn create_daemon( + address: std::net::SocketAddr, + create_service: F, + pidfn: Option<&str>, +) -> Result<(), Error> +where + F: FnOnce(tokio::net::TcpListener) -> Result, + S: Future>, +{ + let mut reloader = Reloader::new()?; + + let listener: tokio::net::TcpListener = reloader + .restore("PROXMOX_BACKUP_LISTEN_FD", move || async move { + Ok(tokio::net::TcpListener::bind(&address).await?) + }) + .await?; + + let service = create_service(listener)?; + + let service = async move { + if let Err(err) = service.await { + log::error!("server error: {}", err); + } + }; + + let server_future = Box::pin(service); + let shutdown_future = crate::shutdown_future(); + + let finish_future = match future::select(server_future, shutdown_future).await { + Either::Left((_, _)) => { + if !crate::shutdown_requested() { + crate::request_shutdown(); // make sure we are in shutdown mode + } + None + } + Either::Right((_, server_future)) => Some(server_future), + }; + + let mut reloader = Some(reloader); + + if crate::is_reload_request() { + log::info!("daemon reload..."); + if let Err(e) = systemd_notify(SystemdNotify::Reloading) { + log::error!("failed to notify systemd about the state change: {}", e); + } + if let Err(e) = systemd_notify_barrier(u64::MAX) { + log::error!("failed to wait on systemd-processing: {}", e); + } + + if let Err(e) = reloader.take().unwrap().fork_restart(pidfn) { + log::error!("error during reload: {}", e); + let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string())); + } + } else { + log::info!("daemon shutting down..."); + } + + if let Some(future) = finish_future { + future.await; + } + + log::info!("daemon shut down."); + Ok(()) +} + +#[link(name = "systemd")] +extern "C" { + fn sd_journal_stream_fd( + identifier: *const c_uchar, + priority: c_int, + level_prefix: c_int, + ) -> c_int; + fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int; + fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int; +} + +/// Systemd sercice startup states (see: ``man sd_notify``) +pub enum SystemdNotify { + Ready, + Reloading, + Stopping, + Status(String), + MainPid(nix::unistd::Pid), +} + +/// Tells systemd the startup state of the service (see: ``man sd_notify``) +pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { + let message = match state { + SystemdNotify::Ready => { + log::info!("service is ready"); + CString::new("READY=1") + } + SystemdNotify::Reloading => CString::new("RELOADING=1"), + SystemdNotify::Stopping => CString::new("STOPPING=1"), + SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)), + SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)), + }?; + let rc = unsafe { sd_notify(0, message.as_ptr()) }; + if rc < 0 { + bail!( + "systemd_notify failed: {}", + std::io::Error::from_raw_os_error(-rc) + ); + } + + Ok(()) +} + +/// Waits until all previously sent messages with sd_notify are processed +pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> { + let rc = unsafe { sd_notify_barrier(0, timeout) }; + if rc < 0 { + bail!( + "systemd_notify_barrier failed: {}", + std::io::Error::from_raw_os_error(-rc) + ); + } + Ok(()) +} diff --git a/proxmox-rest-server/src/environment.rs b/proxmox-rest-server/src/environment.rs new file mode 100644 index 00000000..b4dff76b --- /dev/null +++ b/proxmox-rest-server/src/environment.rs @@ -0,0 +1,98 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use serde_json::{json, Value}; + +use proxmox_router::{RpcEnvironment, RpcEnvironmentType}; + +use crate::ApiConfig; + +/// Encapsulates information about the runtime environment +pub struct RestEnvironment { + env_type: RpcEnvironmentType, + result_attributes: Value, + auth_id: Option, + client_ip: Option, + api: Arc, +} + +impl RestEnvironment { + pub fn new(env_type: RpcEnvironmentType, api: Arc) -> Self { + Self { + result_attributes: json!({}), + auth_id: None, + client_ip: None, + env_type, + api, + } + } + + pub fn api_config(&self) -> &ApiConfig { + &self.api + } + + pub fn log_auth(&self, auth_id: &str) { + let msg = format!("successful auth for user '{}'", auth_id); + log::debug!("{}", msg); // avoid noisy syslog, admins can already check the auth log + if let Some(auth_logger) = self.api.get_auth_log() { + auth_logger.lock().unwrap().log(&msg); + } + } + + pub fn log_failed_auth(&self, failed_auth_id: Option, msg: &str) { + let msg = match (self.client_ip, failed_auth_id) { + (Some(peer), Some(user)) => { + format!( + "authentication failure; rhost={} user={} msg={}", + peer, user, msg + ) + } + (Some(peer), None) => { + format!("authentication failure; rhost={} msg={}", peer, msg) + } + (None, Some(user)) => { + format!( + "authentication failure; rhost=unknown user={} msg={}", + user, msg + ) + } + (None, None) => { + format!("authentication failure; rhost=unknown msg={}", msg) + } + }; + log::error!("{}", msg); + if let Some(auth_logger) = self.api.get_auth_log() { + auth_logger.lock().unwrap().log(&msg); + } + } +} + +impl RpcEnvironment for RestEnvironment { + fn result_attrib_mut(&mut self) -> &mut Value { + &mut self.result_attributes + } + + fn result_attrib(&self) -> &Value { + &self.result_attributes + } + + fn env_type(&self) -> RpcEnvironmentType { + self.env_type + } + + fn set_auth_id(&mut self, auth_id: Option) { + self.auth_id = auth_id; + } + + fn get_auth_id(&self) -> Option { + self.auth_id.clone() + } + + fn set_client_ip(&mut self, client_ip: Option) { + self.client_ip = client_ip; + } + + fn get_client_ip(&self) -> Option { + self.client_ip + } +} diff --git a/proxmox-rest-server/src/file_logger.rs b/proxmox-rest-server/src/file_logger.rs new file mode 100644 index 00000000..2bb1fac6 --- /dev/null +++ b/proxmox-rest-server/src/file_logger.rs @@ -0,0 +1,147 @@ +use std::io::Write; + +use anyhow::Error; +use nix::fcntl::OFlag; + +use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions}; + +/// Options to control the behavior of a [FileLogger] instance +#[derive(Default)] +pub struct FileLogOptions { + /// Open underlying log file in append mode, useful when multiple concurrent processes + /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic + /// for writes smaller than the PIPE_BUF (4k on Linux). + /// Inside the same process you may need to still use an mutex, for shared access. + pub append: bool, + /// Open underlying log file as readable + pub read: bool, + /// If set, ensure that the file is newly created or error out if already existing. + pub exclusive: bool, + /// Duplicate logged messages to STDOUT, like tee + pub to_stdout: bool, + /// Prefix messages logged to the file with the current local time as RFC 3339 + pub prefix_time: bool, + /// File owner/group and mode + pub file_opts: CreateOptions, +} + +/// Log messages with optional automatically added timestamps into files +/// +/// #### Example: +/// ``` +/// # use anyhow::{bail, format_err, Error}; +/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions}; +/// +/// # std::fs::remove_file("test.log"); +/// let options = FileLogOptions { +/// to_stdout: true, +/// exclusive: true, +/// ..Default::default() +/// }; +/// let mut log = FileLogger::new("test.log", options).unwrap(); +/// flog!(log, "A simple log: {}", "Hello!"); +/// # std::fs::remove_file("test.log"); +/// ``` +pub struct FileLogger { + file: std::fs::File, + file_name: std::path::PathBuf, + options: FileLogOptions, +} + +/// Log messages to [FileLogger] - ``println`` like macro +#[macro_export] +macro_rules! flog { + ($log:expr, $($arg:tt)*) => ({ + $log.log(format!($($arg)*)); + }) +} + +impl FileLogger { + pub fn new>( + file_name: P, + options: FileLogOptions, + ) -> Result { + let file = Self::open(&file_name, &options)?; + + let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf(); + + Ok(Self { + file, + file_name, + options, + }) + } + + pub fn reopen(&mut self) -> Result<&Self, Error> { + let file = Self::open(&self.file_name, &self.options)?; + self.file = file; + Ok(self) + } + + fn open>( + file_name: P, + options: &FileLogOptions, + ) -> Result { + let mut flags = OFlag::O_CLOEXEC; + + if options.read { + flags |= OFlag::O_RDWR; + } else { + flags |= OFlag::O_WRONLY; + } + + if options.append { + flags |= OFlag::O_APPEND; + } + if options.exclusive { + flags |= OFlag::O_EXCL; + } + + let file = + atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?; + + Ok(file) + } + + pub fn log>(&mut self, msg: S) { + let msg = msg.as_ref(); + + if self.options.to_stdout { + let mut stdout = std::io::stdout(); + stdout.write_all(msg.as_bytes()).unwrap(); + stdout.write_all(b"\n").unwrap(); + } + + let line = if self.options.prefix_time { + let now = proxmox_time::epoch_i64(); + let rfc3339 = match proxmox_time::epoch_to_rfc3339(now) { + Ok(rfc3339) => rfc3339, + Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen! + }; + format!("{}: {}\n", rfc3339, msg) + } else { + format!("{}\n", msg) + }; + if let Err(err) = self.file.write_all(line.as_bytes()) { + // avoid panicking, log methods should not do that + // FIXME: or, return result??? + log::error!("error writing to log file - {}", err); + } + } +} + +impl std::io::Write for FileLogger { + fn write(&mut self, buf: &[u8]) -> Result { + if self.options.to_stdout { + let _ = std::io::stdout().write(buf); + } + self.file.write(buf) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + if self.options.to_stdout { + let _ = std::io::stdout().flush(); + } + self.file.flush() + } +} diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs new file mode 100644 index 00000000..2e9a01fa --- /dev/null +++ b/proxmox-rest-server/src/formatter.rs @@ -0,0 +1,235 @@ +//! Helpers to format response data +use std::collections::HashMap; + +use anyhow::Error; +use serde_json::{json, Value}; + +use hyper::header; +use hyper::{Body, Response, StatusCode}; + +use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn}; +use proxmox_schema::ParameterError; + +/// Extension to set error message for server side logging +pub(crate) struct ErrorMessageExtension(pub String); + +/// Methods to format data and errors +pub trait OutputFormatter: Send + Sync { + /// Transform json data into a http response + fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response; + + /// Transform serializable data into a streaming http response + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error>; + + /// Transform errors into a http response + fn format_error(&self, err: Error) -> Response; + + /// Transform a [Result] into a http response + fn format_result( + &self, + result: Result, + rpcenv: &dyn RpcEnvironment, + ) -> Response { + match result { + Ok(data) => self.format_data(data, rpcenv), + Err(err) => self.format_error(err), + } + } +} + +static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8"; + +fn json_data_response(data: Value) -> Response { + let json_str = data.to_string(); + + let raw = json_str.into_bytes(); + + let mut response = Response::new(raw.into()); + response.headers_mut().insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ); + + response +} + +fn json_data_response_streaming(body: Body) -> Result, Error> { + let response = Response::builder() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(body)?; + Ok(response) +} + +fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { + let attributes = match rpcenv.result_attrib().as_object() { + Some(attr) => attr, + None => return, + }; + + for (key, value) in attributes { + result[key] = value.clone(); + } +} + +fn start_data_streaming( + value: Value, + data: Box, +) -> tokio::sync::mpsc::Receiver, Error>> { + let (writer, reader) = tokio::sync::mpsc::channel(1); + + tokio::task::spawn_blocking(move || { + let output = proxmox_async::blocking::SenderWriter::from_sender(writer); + let mut output = std::io::BufWriter::new(output); + let mut serializer = serde_json::Serializer::new(&mut output); + let _ = data.sender_serialize(&mut serializer, value); + }); + + reader +} + +struct JsonFormatter(); + +/// Format data as ``application/json`` +/// +/// The returned json object contains the following properties: +/// +/// * ``data``: The result data (on success) +/// +/// Any result attributes set on ``rpcenv`` are also added to the object. +/// +/// Errors generates a BAD_REQUEST containing the error +/// message as string. +pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter(); + +impl OutputFormatter for JsonFormatter { + fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { + let mut result = json!({ "data": data }); + + add_result_attributes(&mut result, rpcenv); + + json_data_response(result) + } + + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let mut value = json!({}); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + + fn format_error(&self, err: Error) -> Response { + let mut response = if let Some(apierr) = err.downcast_ref::() { + let mut resp = Response::new(Body::from(apierr.message.clone())); + *resp.status_mut() = apierr.code; + resp + } else { + let mut resp = Response::new(Body::from(err.to_string())); + *resp.status_mut() = StatusCode::BAD_REQUEST; + resp + }; + + response.headers_mut().insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ); + + response + .extensions_mut() + .insert(ErrorMessageExtension(err.to_string())); + + response + } +} + +/// Format data as ExtJS compatible ``application/json`` +/// +/// The returned json object contains the following properties: +/// +/// * ``success``: boolean attribute indicating the success. +/// +/// * ``data``: The result data (on success) +/// +/// * ``message``: The error message (on failure) +/// +/// * ``errors``: detailed list of errors (if available) +/// +/// Any result attributes set on ``rpcenv`` are also added to the object. +/// +/// Please note that errors return status code OK, but setting success +/// to false. +pub static EXTJS_FORMATTER: &'static dyn OutputFormatter = &ExtJsFormatter(); + +struct ExtJsFormatter(); + +impl OutputFormatter for ExtJsFormatter { + fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { + let mut result = json!({ + "data": data, + "success": true + }); + + add_result_attributes(&mut result, rpcenv); + + json_data_response(result) + } + + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let mut value = json!({ + "success": true, + }); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + + fn format_error(&self, err: Error) -> Response { + let mut errors = HashMap::new(); + + let message: String = match err.downcast::() { + Ok(param_err) => { + for (name, err) in param_err { + errors.insert(name, err.to_string()); + } + String::from("parameter verification errors") + } + Err(err) => err.to_string(), + }; + + let result = json!({ + "message": message, + "errors": errors, + "success": false + }); + + let mut response = json_data_response(result); + + response + .extensions_mut() + .insert(ErrorMessageExtension(message)); + + response + } +} diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs new file mode 100644 index 00000000..3f90c178 --- /dev/null +++ b/proxmox-rest-server/src/h2service.rs @@ -0,0 +1,147 @@ +use anyhow::Error; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::*; +use hyper::{Body, Request, Response, StatusCode}; + +use proxmox_router::http_err; +use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment}; + +use crate::formatter::*; +use crate::{normalize_uri_path, WorkerTask}; + +/// Hyper Service implementation to handle stateful H2 connections. +/// +/// We use this kind of service to handle backup protocol +/// connections. State is stored inside the generic ``rpcenv``. Logs +/// goes into the ``WorkerTask`` log. +pub struct H2Service { + router: &'static Router, + rpcenv: E, + worker: Arc, + debug: bool, +} + +impl H2Service { + pub fn new(rpcenv: E, worker: Arc, router: &'static Router, debug: bool) -> Self { + Self { + rpcenv, + worker, + router, + debug, + } + } + + pub fn debug>(&self, msg: S) { + if self.debug { + self.worker.log_message(msg); + } + } + + fn handle_request(&self, req: Request) -> ApiResponseFuture { + let (parts, body) = req.into_parts(); + + let method = parts.method.clone(); + + let (path, components) = match normalize_uri_path(parts.uri.path()) { + Ok((p, c)) => (p, c), + Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(), + }; + + self.debug(format!("{} {}", method, path)); + + let mut uri_param = HashMap::new(); + + let formatter = JSON_FORMATTER; + + match self.router.find_method(&components, method, &mut uri_param) { + None => { + let err = http_err!(NOT_FOUND, "Path '{}' not found.", path); + future::ok(formatter.format_error(err)).boxed() + } + Some(api_method) => crate::rest::handle_api_request( + self.rpcenv.clone(), + api_method, + formatter, + parts, + body, + uri_param, + ) + .boxed(), + } + } + + fn log_response( + worker: Arc, + method: hyper::Method, + path: &str, + resp: &Response, + ) { + let status = resp.status(); + + if !status.is_success() { + let reason = status.canonical_reason().unwrap_or("unknown reason"); + + let mut message = "request failed"; + if let Some(data) = resp.extensions().get::() { + message = &data.0; + } + + worker.log_message(format!( + "{} {}: {} {}: {}", + method.as_str(), + path, + status.as_str(), + reason, + message + )); + } + } +} + +impl tower_service::Service> for H2Service { + type Response = Response; + type Error = Error; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let path = req.uri().path().to_owned(); + let method = req.method().clone(); + let worker = self.worker.clone(); + + self.handle_request(req) + .map(move |result| match result { + Ok(res) => { + Self::log_response(worker, method, &path, &res); + Ok::<_, Error>(res) + } + Err(err) => { + if let Some(apierr) = err.downcast_ref::() { + let mut resp = Response::new(Body::from(apierr.message.clone())); + resp.extensions_mut() + .insert(ErrorMessageExtension(apierr.message.clone())); + *resp.status_mut() = apierr.code; + Self::log_response(worker, method, &path, &resp); + Ok(resp) + } else { + let mut resp = Response::new(Body::from(err.to_string())); + resp.extensions_mut() + .insert(ErrorMessageExtension(err.to_string())); + *resp.status_mut() = StatusCode::BAD_REQUEST; + Self::log_response(worker, method, &path, &resp); + Ok(resp) + } + } + }) + .boxed() + } +} diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs new file mode 100644 index 00000000..b8a73e35 --- /dev/null +++ b/proxmox-rest-server/src/lib.rs @@ -0,0 +1,244 @@ +//! # Proxmox REST server +//! +//! This module provides convenient building blocks to implement a +//! REST server. +//! +//! ## Features +//! +//! * highly threaded code, uses Rust async +//! * static API definitions using schemas +//! * restartable systemd daemons using `systemd_notify` +//! * support for long running worker tasks (threads or async tokio tasks) +//! * supports separate access and authentication log files +//! * extra control socket to trigger management operations +//! - logfile rotation +//! - worker task management +//! * generic interface to authenticate user + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; + +use anyhow::{bail, format_err, Error}; +use http::request::Parts; +use http::HeaderMap; +use hyper::{Body, Method, Response}; +use nix::unistd::Pid; + +use proxmox_router::UserInformation; +use proxmox_sys::fd::Fd; +use proxmox_sys::fs::CreateOptions; +use proxmox_sys::linux::procfs::PidStat; + +mod compression; +pub use compression::*; + +pub mod daemon; + +pub mod formatter; + +mod environment; +pub use environment::*; + +mod state; +pub use state::*; + +mod command_socket; +pub use command_socket::*; + +mod file_logger; +pub use file_logger::{FileLogOptions, FileLogger}; + +mod api_config; +pub use api_config::ApiConfig; + +mod rest; +pub use rest::RestServer; + +mod worker_task; +pub use worker_task::*; + +mod h2service; +pub use h2service::*; + +/// Authentication Error +pub enum AuthError { + Generic(Error), + NoData, +} + +impl From for AuthError { + fn from(err: Error) -> Self { + AuthError::Generic(err) + } +} + +/// Result of [`ServerAdapter::check_auth`]. +pub type ServerAdapterCheckAuth<'a> = Pin< + Box< + dyn Future), AuthError>> + + Send + + 'a, + >, +>; + +/// User Authentication and index/root page generation methods +pub trait ServerAdapter: Send + Sync { + /// Returns the index/root page + fn get_index( + &self, + rest_env: RestEnvironment, + parts: Parts, + ) -> Pin> + Send>>; + + /// Extract user credentials from headers and check them. + /// + /// If credenthials are valid, returns the username and a + /// [UserInformation] object to query additional user data. + fn check_auth<'a>( + &'a self, + headers: &'a HeaderMap, + method: &'a Method, + ) -> ServerAdapterCheckAuth<'a>; +} + +lazy_static::lazy_static! { + static ref PID: i32 = unsafe { libc::getpid() }; + static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; +} + +/// Returns the current process ID (see [libc::getpid]) +/// +/// The value is cached at startup (so it is invalid after a fork) +pub(crate) fn pid() -> i32 { + *PID +} + +/// Returns the starttime of the process (see [PidStat]) +/// +/// The value is cached at startup (so it is invalid after a fork) +pub(crate) fn pstart() -> u64 { + *PSTART +} + +/// Helper to write the PID into a file +pub fn write_pid(pid_fn: &str) -> Result<(), Error> { + let pid_str = format!("{}\n", *PID); + proxmox_sys::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new(), false) +} + +/// Helper to read the PID from a file +pub fn read_pid(pid_fn: &str) -> Result { + let pid = proxmox_sys::fs::file_get_contents(pid_fn)?; + let pid = std::str::from_utf8(&pid)?.trim(); + pid.parse() + .map_err(|err| format_err!("could not parse pid - {}", err)) +} + +/// Returns the control socket path for a specific process ID. +/// +/// Note: The control socket always uses @/run/proxmox-backup/ as +/// prefix for historic reason. This does not matter because the +/// generated path is unique for each ``pid`` anyways. +pub fn ctrl_sock_from_pid(pid: i32) -> String { + // Note: The control socket always uses @/run/proxmox-backup/ as prefix + // for historc reason. + format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) +} + +/// Returns the control socket path for this server. +pub fn our_ctrl_sock() -> String { + ctrl_sock_from_pid(*PID) +} + +static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); + +/// Request a server shutdown (usually called from [catch_shutdown_signal]) +pub fn request_shutdown() { + SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst); + crate::server_shutdown(); +} + +/// Returns true if there was a shutdown request. +#[inline(always)] +pub fn shutdown_requested() -> bool { + SHUTDOWN_REQUESTED.load(Ordering::SeqCst) +} + +/// Raise an error if there was a shutdown request. +pub fn fail_on_shutdown() -> Result<(), Error> { + if shutdown_requested() { + bail!("Server shutdown requested - aborting task"); + } + Ok(()) +} + +/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file +/// descriptors. +pub fn socketpair() -> Result<(Fd, Fd), Error> { + use nix::sys::socket; + let (pa, pb) = socket::socketpair( + socket::AddressFamily::Unix, + socket::SockType::Stream, + None, + socket::SockFlag::SOCK_CLOEXEC, + )?; + Ok((Fd(pa), Fd(pb))) +} + +/// Extract a specific cookie from cookie header. +/// We assume cookie_name is already url encoded. +pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option { + for pair in cookie.split(';') { + let (name, value) = match pair.find('=') { + Some(i) => (pair[..i].trim(), pair[(i + 1)..].trim()), + None => return None, // Cookie format error + }; + + if name == cookie_name { + use percent_encoding::percent_decode; + if let Ok(value) = percent_decode(value.as_bytes()).decode_utf8() { + return Some(value.into()); + } else { + return None; // Cookie format error + } + } + } + + None +} + +/// Extract a specific cookie from a HeaderMap's "COOKIE" entry. +/// We assume cookie_name is already url encoded. +pub fn cookie_from_header(headers: &http::HeaderMap, cookie_name: &str) -> Option { + if let Some(Ok(cookie)) = headers.get("COOKIE").map(|v| v.to_str()) { + extract_cookie(cookie, cookie_name) + } else { + None + } +} + +/// normalize uri path +/// +/// Do not allow ".", "..", or hidden files ".XXXX" +/// Also remove empty path components +pub fn normalize_uri_path(path: &str) -> Result<(String, Vec<&str>), Error> { + let items = path.split('/'); + + let mut path = String::new(); + let mut components = vec![]; + + for name in items { + if name.is_empty() { + continue; + } + if name.starts_with('.') { + bail!("Path contains illegal components."); + } + path.push('/'); + path.push_str(name); + components.push(name); + } + + Ok((path, components)) +} diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs new file mode 100644 index 00000000..96c35f09 --- /dev/null +++ b/proxmox-rest-server/src/rest.rs @@ -0,0 +1,789 @@ +use std::collections::HashMap; +use std::future::Future; +use std::hash::BuildHasher; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use anyhow::{bail, format_err, Error}; +use futures::future::{self, FutureExt, TryFutureExt}; +use futures::stream::TryStreamExt; +use hyper::body::HttpBody; +use hyper::header::{self, HeaderMap}; +use hyper::http::request::Parts; +use hyper::{Body, Request, Response, StatusCode}; +use lazy_static::lazy_static; +use regex::Regex; +use serde_json::Value; +use tokio::fs::File; +use tokio::time::Instant; +use tower_service::Service; +use url::form_urlencoded; + +use proxmox_router::http_err; +use proxmox_router::{ + check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, + RpcEnvironmentType, UserInformation, +}; +use proxmox_schema::{ObjectSchemaType, ParameterSchema}; + +use proxmox_http::client::RateLimitedStream; + +use proxmox_async::stream::AsyncReaderStream; +use proxmox_compression::{DeflateEncoder, Level}; + +use crate::{ + formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger, + RestEnvironment, +}; + +extern "C" { + fn tzset(); +} + +struct AuthStringExtension(String); + +struct EmptyUserInformation {} + +impl UserInformation for EmptyUserInformation { + fn is_superuser(&self, _userid: &str) -> bool { + false + } + fn is_group_member(&self, _userid: &str, _group: &str) -> bool { + false + } + fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { + 0 + } +} + +/// REST server implementation (configured with [ApiConfig]) +/// +/// This struct implements the [Service] trait in order to use it with +/// [hyper::server::Builder::serve]. +pub struct RestServer { + api_config: Arc, +} + +const MAX_URI_QUERY_LENGTH: usize = 3072; +const CHUNK_SIZE_LIMIT: u64 = 32 * 1024; + +impl RestServer { + /// Creates a new instance. + pub fn new(api_config: ApiConfig) -> Self { + Self { + api_config: Arc::new(api_config), + } + } +} + +impl Service<&Pin>>>> + for RestServer +{ + type Response = ApiService; + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call( + &mut self, + ctx: &Pin>>>, + ) -> Self::Future { + match ctx.get_ref().peer_addr() { + Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(), + Ok(peer) => future::ok(ApiService { + peer, + api_config: self.api_config.clone(), + }) + .boxed(), + } + } +} + +impl Service<&Pin>>> for RestServer { + type Response = ApiService; + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call( + &mut self, + ctx: &Pin>>, + ) -> Self::Future { + match ctx.get_ref().peer_addr() { + Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(), + Ok(peer) => future::ok(ApiService { + peer, + api_config: self.api_config.clone(), + }) + .boxed(), + } + } +} + +impl Service<&hyper::server::conn::AddrStream> for RestServer { + type Response = ApiService; + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future { + let peer = ctx.remote_addr(); + future::ok(ApiService { + peer, + api_config: self.api_config.clone(), + }) + .boxed() + } +} + +impl Service<&tokio::net::UnixStream> for RestServer { + type Response = ApiService; + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _ctx: &tokio::net::UnixStream) -> Self::Future { + // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now + // it doesn't really matter, so just use a fake IP address + let fake_peer = "0.0.0.0:807".parse().unwrap(); + future::ok(ApiService { + peer: fake_peer, + api_config: self.api_config.clone(), + }) + .boxed() + } +} + +// Helper [Service] containing the peer Address +// +// The lower level connection [Service] implementation on +// [RestServer] extracts the peer address and return an [ApiService]. +// +// Rust wants this type 'pub' here (else we get 'private type `ApiService` +// in public interface'). The type is still private because the crate does +// not export it. +pub struct ApiService { + pub peer: std::net::SocketAddr, + pub api_config: Arc, +} + +fn log_response( + logfile: Option<&Arc>>, + peer: &std::net::SocketAddr, + method: hyper::Method, + path_query: &str, + resp: &Response, + user_agent: Option, +) { + if resp.extensions().get::().is_some() { + return; + }; + + // we also log URL-to-long requests, so avoid message bigger than PIPE_BUF (4k on Linux) + // to profit from atomicty guarantees for O_APPEND opened logfiles + let path = &path_query[..MAX_URI_QUERY_LENGTH.min(path_query.len())]; + + let status = resp.status(); + if !(status.is_success() || status.is_informational()) { + let reason = status.canonical_reason().unwrap_or("unknown reason"); + + let message = match resp.extensions().get::() { + Some(data) => &data.0, + None => "request failed", + }; + + log::error!( + "{} {}: {} {}: [client {}] {}", + method.as_str(), + path, + status.as_str(), + reason, + peer, + message + ); + } + if let Some(logfile) = logfile { + let auth_id = match resp.extensions().get::() { + Some(AuthStringExtension(auth_id)) => auth_id.clone(), + None => "-".to_string(), + }; + let now = proxmox_time::epoch_i64(); + // time format which apache/nginx use (by default), copied from pve-http-server + let datetime = proxmox_time::strftime_local("%d/%m/%Y:%H:%M:%S %z", now) + .unwrap_or_else(|_| "-".to_string()); + + logfile.lock().unwrap().log(format!( + "{} - {} [{}] \"{} {}\" {} {} {}", + peer.ip(), + auth_id, + datetime, + method.as_str(), + path, + status.as_str(), + resp.body().size_hint().lower(), + user_agent.unwrap_or_else(|| "-".to_string()), + )); + } +} + +fn get_proxied_peer(headers: &HeaderMap) -> Option { + lazy_static! { + static ref RE: Regex = Regex::new(r#"for="([^"]+)""#).unwrap(); + } + let forwarded = headers.get(header::FORWARDED)?.to_str().ok()?; + let capture = RE.captures(forwarded)?; + let rhost = capture.get(1)?.as_str(); + + rhost.parse().ok() +} + +fn get_user_agent(headers: &HeaderMap) -> Option { + let agent = headers.get(header::USER_AGENT)?.to_str(); + agent + .map(|s| { + let mut s = s.to_owned(); + s.truncate(128); + s + }) + .ok() +} + +impl Service> for ApiService { + type Response = Response; + type Error = Error; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let path = req.uri().path_and_query().unwrap().as_str().to_owned(); + let method = req.method().clone(); + let user_agent = get_user_agent(req.headers()); + + let config = Arc::clone(&self.api_config); + let peer = match get_proxied_peer(req.headers()) { + Some(proxied_peer) => proxied_peer, + None => self.peer, + }; + async move { + let response = match handle_request(Arc::clone(&config), req, &peer).await { + Ok(response) => response, + Err(err) => { + let (err, code) = match err.downcast_ref::() { + Some(apierr) => (apierr.message.clone(), apierr.code), + _ => (err.to_string(), StatusCode::BAD_REQUEST), + }; + Response::builder() + .status(code) + .extension(ErrorMessageExtension(err.to_string())) + .body(err.into())? + } + }; + let logger = config.get_access_log(); + log_response(logger, &peer, method, &path, &response, user_agent); + Ok(response) + } + .boxed() + } +} + +fn parse_query_parameters( + param_schema: ParameterSchema, + form: &str, // x-www-form-urlencoded body data + parts: &Parts, + uri_param: &HashMap, +) -> Result { + let mut param_list: Vec<(String, String)> = vec![]; + + if !form.is_empty() { + for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() { + param_list.push((k, v)); + } + } + + if let Some(query_str) = parts.uri.query() { + for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { + if k == "_dc" { + continue; + } // skip extjs "disable cache" parameter + param_list.push((k, v)); + } + } + + for (k, v) in uri_param { + param_list.push((k.clone(), v.clone())); + } + + let params = param_schema.parse_parameter_strings(¶m_list, true)?; + + Ok(params) +} + +async fn get_request_parameters( + param_schema: ParameterSchema, + parts: Parts, + req_body: Body, + uri_param: HashMap, +) -> Result { + let mut is_json = false; + + if let Some(value) = parts.headers.get(header::CONTENT_TYPE) { + match value.to_str().map(|v| v.split(';').next()) { + Ok(Some("application/x-www-form-urlencoded")) => { + is_json = false; + } + Ok(Some("application/json")) => { + is_json = true; + } + _ => bail!("unsupported content type {:?}", value.to_str()), + } + } + + let body = TryStreamExt::map_err(req_body, |err| { + http_err!(BAD_REQUEST, "Problems reading request body: {}", err) + }) + .try_fold(Vec::new(), |mut acc, chunk| async move { + // FIXME: max request body size? + if acc.len() + chunk.len() < 64 * 1024 { + acc.extend_from_slice(&chunk); + Ok(acc) + } else { + Err(http_err!(BAD_REQUEST, "Request body too large")) + } + }) + .await?; + + let utf8_data = + std::str::from_utf8(&body).map_err(|err| format_err!("Request body not uft8: {}", err))?; + + if is_json { + let mut params: Value = serde_json::from_str(utf8_data)?; + for (k, v) in uri_param { + if let Some((_optional, prop_schema)) = param_schema.lookup(&k) { + params[&k] = prop_schema.parse_simple_value(&v)?; + } + } + param_schema.verify_json(¶ms)?; + Ok(params) + } else { + parse_query_parameters(param_schema, utf8_data, &parts, &uri_param) + } +} + +struct NoLogExtension(); + +async fn proxy_protected_request( + info: &'static ApiMethod, + mut parts: Parts, + req_body: Body, + peer: &std::net::SocketAddr, +) -> Result, Error> { + let mut uri_parts = parts.uri.clone().into_parts(); + + uri_parts.scheme = Some(http::uri::Scheme::HTTP); + uri_parts.authority = Some(http::uri::Authority::from_static("127.0.0.1:82")); + let new_uri = http::Uri::from_parts(uri_parts).unwrap(); + + parts.uri = new_uri; + + let mut request = Request::from_parts(parts, req_body); + request.headers_mut().insert( + header::FORWARDED, + format!("for=\"{}\";", peer).parse().unwrap(), + ); + + let reload_timezone = info.reload_timezone; + + let resp = hyper::client::Client::new() + .request(request) + .map_err(Error::from) + .map_ok(|mut resp| { + resp.extensions_mut().insert(NoLogExtension()); + resp + }) + .await?; + + if reload_timezone { + unsafe { + tzset(); + } + } + + Ok(resp) +} + +pub(crate) async fn handle_api_request( + mut rpcenv: Env, + info: &'static ApiMethod, + formatter: &'static dyn OutputFormatter, + parts: Parts, + req_body: Body, + uri_param: HashMap, +) -> Result, Error> { + let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); + let compression = extract_compression_method(&parts.headers); + + let result = match info.handler { + ApiHandler::AsyncHttp(handler) => { + let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; + (handler)(parts, req_body, params, info, Box::new(rpcenv)).await + } + ApiHandler::StreamingSync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + (handler)(params, info, &mut rpcenv) + .and_then(|data| formatter.format_data_streaming(data, &rpcenv)) + } + ApiHandler::StreamingAsync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + (handler)(params, info, &mut rpcenv) + .await + .and_then(|data| formatter.format_data_streaming(data, &rpcenv)) + } + ApiHandler::Sync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + (handler)(params, info, &mut rpcenv).map(|data| formatter.format_data(data, &rpcenv)) + } + ApiHandler::Async(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + (handler)(params, info, &mut rpcenv) + .await + .map(|data| formatter.format_data(data, &rpcenv)) + } + _ => { + bail!("Unknown API handler type"); + } + }; + + let mut resp = match result { + Ok(resp) => resp, + Err(err) => { + if let Some(httperr) = err.downcast_ref::() { + if httperr.code == StatusCode::UNAUTHORIZED { + tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; + } + } + formatter.format_error(err) + } + }; + + let resp = match compression { + Some(CompressionMethod::Deflate) => { + resp.headers_mut().insert( + header::CONTENT_ENCODING, + CompressionMethod::Deflate.content_encoding(), + ); + resp.map(|body| { + Body::wrap_stream(DeflateEncoder::with_quality( + TryStreamExt::map_err(body, |err| { + proxmox_lang::io_format_err!("error during compression: {}", err) + }), + Level::Default, + )) + }) + } + None => resp, + }; + + if info.reload_timezone { + unsafe { + tzset(); + } + } + + Ok(resp) +} + +fn extension_to_content_type(filename: &Path) -> (&'static str, bool) { + if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) { + return match ext { + "css" => ("text/css", false), + "html" => ("text/html", false), + "js" => ("application/javascript", false), + "json" => ("application/json", false), + "map" => ("application/json", false), + "png" => ("image/png", true), + "ico" => ("image/x-icon", true), + "gif" => ("image/gif", true), + "svg" => ("image/svg+xml", false), + "jar" => ("application/java-archive", true), + "woff" => ("application/font-woff", true), + "woff2" => ("application/font-woff2", true), + "ttf" => ("application/font-snft", true), + "pdf" => ("application/pdf", true), + "epub" => ("application/epub+zip", true), + "mp3" => ("audio/mpeg", true), + "oga" => ("audio/ogg", true), + "tgz" => ("application/x-compressed-tar", true), + _ => ("application/octet-stream", false), + }; + } + + ("application/octet-stream", false) +} + +async fn simple_static_file_download( + filename: PathBuf, + content_type: &'static str, + compression: Option, +) -> Result, Error> { + use tokio::io::AsyncReadExt; + + let mut file = File::open(filename) + .await + .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; + + let mut data: Vec = Vec::new(); + + let mut response = match compression { + Some(CompressionMethod::Deflate) => { + let mut enc = DeflateEncoder::with_quality(data, Level::Default); + enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize) + .await?; + let mut response = Response::new(enc.into_inner().into()); + response.headers_mut().insert( + header::CONTENT_ENCODING, + CompressionMethod::Deflate.content_encoding(), + ); + response + } + None => { + file.read_to_end(&mut data) + .await + .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?; + Response::new(data.into()) + } + }; + + response.headers_mut().insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static(content_type), + ); + + Ok(response) +} + +async fn chuncked_static_file_download( + filename: PathBuf, + content_type: &'static str, + compression: Option, +) -> Result, Error> { + let mut resp = Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, content_type); + + let file = File::open(filename) + .await + .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; + + let body = match compression { + Some(CompressionMethod::Deflate) => { + resp = resp.header( + header::CONTENT_ENCODING, + CompressionMethod::Deflate.content_encoding(), + ); + Body::wrap_stream(DeflateEncoder::with_quality( + AsyncReaderStream::new(file), + Level::Default, + )) + } + None => Body::wrap_stream(AsyncReaderStream::new(file)), + }; + + Ok(resp.body(body).unwrap()) +} + +async fn handle_static_file_download( + filename: PathBuf, + compression: Option, +) -> Result, Error> { + let metadata = tokio::fs::metadata(filename.clone()) + .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err)) + .await?; + + let (content_type, nocomp) = extension_to_content_type(&filename); + let compression = if nocomp { None } else { compression }; + + if metadata.len() < CHUNK_SIZE_LIMIT { + simple_static_file_download(filename, content_type, compression).await + } else { + chuncked_static_file_download(filename, content_type, compression).await + } +} + +// FIXME: support handling multiple compression methods +fn extract_compression_method(headers: &http::HeaderMap) -> Option { + if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) { + for encoding in encodings.split(&[',', ' '][..]) { + if let Ok(method) = encoding.parse() { + return Some(method); + } + } + } + None +} + +async fn handle_request( + api: Arc, + req: Request, + peer: &std::net::SocketAddr, +) -> Result, Error> { + let (parts, body) = req.into_parts(); + let method = parts.method.clone(); + let (path, components) = normalize_uri_path(parts.uri.path())?; + + let comp_len = components.len(); + + let query = parts.uri.query().unwrap_or_default(); + if path.len() + query.len() > MAX_URI_QUERY_LENGTH { + return Ok(Response::builder() + .status(StatusCode::URI_TOO_LONG) + .body("".into()) + .unwrap()); + } + + let env_type = api.env_type(); + let mut rpcenv = RestEnvironment::new(env_type, Arc::clone(&api)); + + rpcenv.set_client_ip(Some(*peer)); + + let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); + let access_forbidden_time = std::time::Instant::now() + std::time::Duration::from_millis(500); + + if comp_len >= 1 && components[0] == "api2" { + if comp_len >= 2 { + let format = components[1]; + + let formatter: &dyn OutputFormatter = match format { + "json" => JSON_FORMATTER, + "extjs" => EXTJS_FORMATTER, + _ => bail!("Unsupported output format '{}'.", format), + }; + + let mut uri_param = HashMap::new(); + let api_method = api.find_method(&components[2..], method.clone(), &mut uri_param); + + let mut auth_required = true; + if let Some(api_method) = api_method { + if let Permission::World = *api_method.access.permission { + auth_required = false; // no auth for endpoints with World permission + } + } + + let mut user_info: Box = + Box::new(EmptyUserInformation {}); + + if auth_required { + match api.check_auth(&parts.headers, &method).await { + Ok((authid, info)) => { + rpcenv.set_auth_id(Some(authid)); + user_info = info; + } + Err(auth_err) => { + let err = match auth_err { + AuthError::Generic(err) => err, + AuthError::NoData => { + format_err!("no authentication credentials provided.") + } + }; + // fixme: log Username?? + rpcenv.log_failed_auth(None, &err.to_string()); + + // always delay unauthorized calls by 3 seconds (from start of request) + let err = http_err!(UNAUTHORIZED, "authentication failed - {}", err); + tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; + return Ok(formatter.format_error(err)); + } + } + } + + match api_method { + None => { + let err = http_err!(NOT_FOUND, "Path '{}' not found.", path); + return Ok(formatter.format_error(err)); + } + Some(api_method) => { + let auth_id = rpcenv.get_auth_id(); + let user_info = user_info; + + if !check_api_permission( + api_method.access.permission, + auth_id.as_deref(), + &uri_param, + user_info.as_ref(), + ) { + let err = http_err!(FORBIDDEN, "permission check failed"); + tokio::time::sleep_until(Instant::from_std(access_forbidden_time)).await; + return Ok(formatter.format_error(err)); + } + + let result = if api_method.protected && env_type == RpcEnvironmentType::PUBLIC { + proxy_protected_request(api_method, parts, body, peer).await + } else { + handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param) + .await + }; + + let mut response = match result { + Ok(resp) => resp, + Err(err) => formatter.format_error(err), + }; + + if let Some(auth_id) = auth_id { + response + .extensions_mut() + .insert(AuthStringExtension(auth_id)); + } + + return Ok(response); + } + } + } + } else { + // not Auth required for accessing files! + + if method != hyper::Method::GET { + bail!("Unsupported HTTP method {}", method); + } + + if comp_len == 0 { + match api.check_auth(&parts.headers, &method).await { + Ok((auth_id, _user_info)) => { + rpcenv.set_auth_id(Some(auth_id)); + return Ok(api.get_index(rpcenv, parts).await); + } + Err(AuthError::Generic(_)) => { + tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; + } + Err(AuthError::NoData) => {} + } + return Ok(api.get_index(rpcenv, parts).await); + } else { + let filename = api.find_alias(&components); + let compression = extract_compression_method(&parts.headers); + return handle_static_file_download(filename, compression).await; + } + } + + Err(http_err!(NOT_FOUND, "Path '{}' not found.", path)) +} diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs new file mode 100644 index 00000000..c3b81627 --- /dev/null +++ b/proxmox-rest-server/src/state.rs @@ -0,0 +1,161 @@ +use anyhow::Error; +use lazy_static::lazy_static; +use std::sync::Mutex; + +use futures::*; + +use tokio::signal::unix::{signal, SignalKind}; + +use proxmox_async::broadcast_future::BroadcastData; + +use crate::request_shutdown; + +#[derive(PartialEq, Copy, Clone, Debug)] +enum ServerMode { + Normal, + Shutdown, +} + +struct ServerState { + mode: ServerMode, + shutdown_listeners: BroadcastData<()>, + last_worker_listeners: BroadcastData<()>, + worker_count: usize, + internal_task_count: usize, + reload_request: bool, +} + +lazy_static! { + static ref SERVER_STATE: Mutex = Mutex::new(ServerState { + mode: ServerMode::Normal, + shutdown_listeners: BroadcastData::new(), + last_worker_listeners: BroadcastData::new(), + worker_count: 0, + internal_task_count: 0, + reload_request: false, + }); +} + +/// Listen to ``SIGINT`` for server shutdown +/// +/// This calls [request_shutdown] when receiving the signal. +pub fn catch_shutdown_signal() -> Result<(), Error> { + let mut stream = signal(SignalKind::interrupt())?; + + let future = async move { + while stream.recv().await.is_some() { + log::info!("got shutdown request (SIGINT)"); + SERVER_STATE.lock().unwrap().reload_request = false; + request_shutdown(); + } + } + .boxed(); + + let abort_future = last_worker_future().map_err(|_| {}); + let task = futures::future::select(future, abort_future); + + tokio::spawn(task.map(|_| ())); + + Ok(()) +} + +/// Listen to ``SIGHUP`` for server reload +/// +/// This calls [request_shutdown] when receiving the signal, and tries +/// to restart the server. +pub fn catch_reload_signal() -> Result<(), Error> { + let mut stream = signal(SignalKind::hangup())?; + + let future = async move { + while stream.recv().await.is_some() { + log::info!("got reload request (SIGHUP)"); + SERVER_STATE.lock().unwrap().reload_request = true; + crate::request_shutdown(); + } + } + .boxed(); + + let abort_future = last_worker_future().map_err(|_| {}); + let task = futures::future::select(future, abort_future); + + tokio::spawn(task.map(|_| ())); + + Ok(()) +} + +pub(crate) fn is_reload_request() -> bool { + let data = SERVER_STATE.lock().unwrap(); + + data.mode == ServerMode::Shutdown && data.reload_request +} + +pub(crate) fn server_shutdown() { + let mut data = SERVER_STATE.lock().unwrap(); + + log::info!("request_shutdown"); + + data.mode = ServerMode::Shutdown; + + data.shutdown_listeners.notify_listeners(Ok(())); + + drop(data); // unlock + + check_last_worker(); +} + +/// Future to signal server shutdown +pub fn shutdown_future() -> impl Future { + let mut data = SERVER_STATE.lock().unwrap(); + data.shutdown_listeners.listen().map(|_| ()) +} + +/// Future to signal when last worker task finished +pub fn last_worker_future() -> impl Future> { + let mut data = SERVER_STATE.lock().unwrap(); + data.last_worker_listeners.listen() +} + +pub(crate) fn set_worker_count(count: usize) { + SERVER_STATE.lock().unwrap().worker_count = count; + + check_last_worker(); +} + +pub(crate) fn check_last_worker() { + let mut data = SERVER_STATE.lock().unwrap(); + + if !(data.mode == ServerMode::Shutdown + && data.worker_count == 0 + && data.internal_task_count == 0) + { + return; + } + + data.last_worker_listeners.notify_listeners(Ok(())); +} + +/// Spawns a tokio task that will be tracked for reload +/// and if it is finished, notify the [last_worker_future] if we +/// are in shutdown mode. +pub fn spawn_internal_task(task: T) +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + let mut data = SERVER_STATE.lock().unwrap(); + data.internal_task_count += 1; + + tokio::spawn(async move { + let _ = tokio::spawn(task).await; // ignore errors + + { + // drop mutex + let mut data = SERVER_STATE.lock().unwrap(); + if data.internal_task_count > 0 { + data.internal_task_count -= 1; + } + } + + check_last_worker(); + }); +} diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs new file mode 100644 index 00000000..44d81119 --- /dev/null +++ b/proxmox-rest-server/src/worker_task.rs @@ -0,0 +1,1058 @@ +use std::collections::{HashMap, VecDeque}; +use std::fs::File; +use std::io::{BufRead, BufReader, Read, Write}; +use std::panic::UnwindSafe; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +use anyhow::{bail, format_err, Error}; +use futures::*; +use lazy_static::lazy_static; +use nix::fcntl::OFlag; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::oneshot; + +use proxmox_lang::try_block; +use proxmox_schema::upid::UPID; +use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions}; +use proxmox_sys::linux::procfs; +use proxmox_sys::task_warn; + +use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; +use proxmox_sys::WorkerTaskContext; + +use crate::{CommandSocket, FileLogOptions, FileLogger}; + +struct TaskListLockGuard(File); + +struct WorkerTaskSetup { + file_opts: CreateOptions, + taskdir: PathBuf, + task_lock_fn: PathBuf, + active_tasks_fn: PathBuf, + task_index_fn: PathBuf, + task_archive_fn: PathBuf, +} + +static WORKER_TASK_SETUP: OnceCell = OnceCell::new(); + +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { + WORKER_TASK_SETUP + .get() + .ok_or_else(|| format_err!("WorkerTask library is not initialized")) +} + +impl WorkerTaskSetup { + fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { + let mut taskdir = basedir; + taskdir.push("tasks"); + + let mut task_lock_fn = taskdir.clone(); + task_lock_fn.push(".active.lock"); + + let mut active_tasks_fn = taskdir.clone(); + active_tasks_fn.push("active"); + + let mut task_index_fn = taskdir.clone(); + task_index_fn.push("index"); + + let mut task_archive_fn = taskdir.clone(); + task_archive_fn.push("archive"); + + Self { + file_opts, + taskdir, + task_lock_fn, + active_tasks_fn, + task_index_fn, + task_archive_fn, + } + } + + fn lock_task_list_files(&self, exclusive: bool) -> Result { + let options = self + .file_opts + .clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let timeout = std::time::Duration::new(10, 0); + + let file = + proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?; + + Ok(TaskListLockGuard(file)) + } + + fn log_path(&self, upid: &UPID) -> std::path::PathBuf { + let mut path = self.taskdir.clone(); + path.push(format!("{:02X}", upid.pstart % 256)); + path.push(upid.to_string()); + path + } + + // atomically read/update the task list, update status of finished tasks + // new_upid is added to the list when specified. + fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { + let lock = self.lock_task_list_files(true)?; + + // TODO remove with 1.x + let mut finish_list: Vec = read_task_file_from_path(&self.task_index_fn)?; + let had_index_file = !finish_list.is_empty(); + + // We use filter_map because one negative case wants to *move* the data into `finish_list`, + // clippy doesn't quite catch this! + #[allow(clippy::unnecessary_filter_map)] + let mut active_list: Vec = read_task_file_from_path(&self.active_tasks_fn)? + .into_iter() + .filter_map(|info| { + if info.state.is_some() { + // this can happen when the active file still includes finished tasks + finish_list.push(info); + return None; + } + + if !worker_is_active_local(&info.upid) { + // println!("Detected stopped task '{}'", &info.upid_str); + let now = proxmox_time::epoch_i64(); + let status = + upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); + finish_list.push(TaskListInfo { + upid: info.upid, + upid_str: info.upid_str, + state: Some(status), + }); + return None; + } + + Some(info) + }) + .collect(); + + if let Some(upid) = new_upid { + active_list.push(TaskListInfo { + upid: upid.clone(), + upid_str: upid.to_string(), + state: None, + }); + } + + let active_raw = render_task_list(&active_list); + + let options = self + .file_opts + .clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?; + + finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) { + (Some(s1), Some(s2)) => s1.cmp(s2), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + _ => a.upid.starttime.cmp(&b.upid.starttime), + }); + + if !finish_list.is_empty() { + let options = self + .file_opts + .clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let mut writer = atomic_open_or_create_file( + &self.task_archive_fn, + OFlag::O_APPEND | OFlag::O_RDWR, + &[], + options, + false, + )?; + for info in &finish_list { + writer.write_all(render_task_line(info).as_bytes())?; + } + } + + // TODO Remove with 1.x + // for compatibility, if we had an INDEX file, we do not need it anymore + if had_index_file { + let _ = nix::unistd::unlink(&self.task_index_fn); + } + + drop(lock); + + Ok(()) + } + + // Create task log directory with correct permissions + fn create_task_log_dirs(&self) -> Result<(), Error> { + try_block!({ + let dir_opts = self + .file_opts + .clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); + + create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?; + // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; + Ok(()) + }) + .map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) + } +} + +/// Initialize the WorkerTask library +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { + let setup = WorkerTaskSetup::new(basedir, file_opts); + setup.create_task_log_dirs()?; + WORKER_TASK_SETUP + .set(setup) + .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) +} + +/// Optionally rotates and/or cleans up the task archive depending on its size and age. +/// +/// Check if the Task Archive is bigger than 'size_threshold' bytes, and rotate in that case. +/// Keeps either only up to 'max_files' if 'max_days' is not given. Else, 'max_files' will be +/// ignored, and all archive files older than the first with only tasks from before 'now-max_days' +/// will be deleted +pub fn rotate_task_log_archive( + size_threshold: u64, + compress: bool, + max_files: Option, + max_days: Option, + options: Option, +) -> Result { + let setup = worker_task_setup()?; + + let _lock = setup.lock_task_list_files(true)?; + + let mut logrotate = LogRotate::new( + &setup.task_archive_fn, + compress, + if max_days.is_none() { max_files } else { None }, + options, + )?; + + let mut rotated = logrotate.rotate(size_threshold)?; + + if let Some(max_days) = max_days { + // NOTE: not on exact day-boundary but close enough for what's done here + let cutoff_time = proxmox_time::epoch_i64() - (max_days * 24 * 60 * 60) as i64; + let mut cutoff = false; + let mut files = logrotate.files(); + // task archives have task-logs sorted by endtime, with the oldest at the start of the + // file. So, peak into every archive and see if the first listed tasks' endtime would be + // cut off. If that's the case we know that the next (older) task archive rotation surely + // falls behind the cut-off line. We cannot say the same for the current archive without + // checking its last (newest) line, but that's more complex, expensive and rather unlikely. + for file_name in logrotate.file_names() { + if !cutoff { + let reader = match files.next() { + Some(file) => BufReader::new(file), + None => bail!("unexpected error: files do not match file_names"), + }; + if let Some(Ok(line)) = reader.lines().next() { + if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { + if state.endtime() < cutoff_time { + // found first file with the oldest entry being cut-off, so next older + // ones are all up for deletion. + cutoff = true; + rotated = true; + } + } + } + } else if let Err(err) = std::fs::remove_file(&file_name) { + log::error!("could not remove {:?}: {}", file_name, err); + } + } + } + + Ok(rotated) +} + +/// removes all task logs that are older than the oldest task entry in the +/// task archive +pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> { + let setup = worker_task_setup()?; + + let _lock = setup.lock_task_list_files(true)?; + + let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?; + + let mut timestamp = None; + if let Some(last_file) = logrotate.files().last() { + let reader = BufReader::new(last_file); + for line in reader.lines() { + let line = line?; + if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { + timestamp = Some(state.endtime()); + break; + } + } + } + + fn get_modified(entry: std::fs::DirEntry) -> Result { + entry.metadata()?.modified() + } + + if let Some(timestamp) = timestamp { + let cutoff_time = if timestamp > 0 { + SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) + } else { + SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) + } + .ok_or_else(|| format_err!("could not calculate cutoff time"))?; + + for i in 0..256 { + let mut path = setup.taskdir.clone(); + path.push(format!("{:02X}", i)); + let files = match std::fs::read_dir(path) { + Ok(files) => files, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, + Err(err) => { + task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err); + continue; + } + }; + for file in files { + let file = match file { + Ok(file) => file, + Err(err) => { + task_warn!( + worker, + "could not check some task log in '{:02X}': {}", + i, + err + ); + continue; + } + }; + let path = file.path(); + + let modified = match get_modified(file) { + Ok(modified) => modified, + Err(err) => { + task_warn!(worker, "error getting mtime for '{:?}': {}", path, err); + continue; + } + }; + + if modified < cutoff_time { + match std::fs::remove_file(&path) { + Ok(()) => {} + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} + Err(err) => { + task_warn!(worker, "could not remove file '{:?}': {}", path, err) + } + } + } + } + } + } + + Ok(()) +} + +/// Path to the worker log file +pub fn upid_log_path(upid: &UPID) -> Result { + let setup = worker_task_setup()?; + Ok(setup.log_path(upid)) +} + +/// Read endtime (time of last log line) and exitstatus from task log file +/// If there is not a single line with at valid datetime, we assume the +/// starttime to be the endtime +pub fn upid_read_status(upid: &UPID) -> Result { + let setup = worker_task_setup()?; + + let mut status = TaskState::Unknown { + endtime: upid.starttime, + }; + + let path = setup.log_path(upid); + + let mut file = File::open(path)?; + + /// speedup - only read tail + use std::io::Seek; + use std::io::SeekFrom; + let _ = file.seek(SeekFrom::End(-8192)); // ignore errors + + let mut data = Vec::with_capacity(8192); + file.read_to_end(&mut data)?; + + // strip newlines at the end of the task logs + while data.last() == Some(&b'\n') { + data.pop(); + } + + let last_line = match data.iter().rposition(|c| *c == b'\n') { + Some(start) if data.len() > (start + 1) => &data[start + 1..], + Some(_) => &data, // should not happen, since we removed all trailing newlines + None => &data, + }; + + let last_line = std::str::from_utf8(last_line) + .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; + + let mut iter = last_line.splitn(2, ": "); + if let Some(time_str) = iter.next() { + if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) { + // set the endtime even if we cannot parse the state + status = TaskState::Unknown { endtime }; + if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { + if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { + status = state; + } + } + } + } + + Ok(status) +} + +lazy_static! { + static ref WORKER_TASK_LIST: Mutex>> = + Mutex::new(HashMap::new()); +} + +/// checks if the task UPID refers to a worker from this process +fn is_local_worker(upid: &UPID) -> bool { + upid.pid == crate::pid() && upid.pstart == crate::pstart() +} + +/// Test if the task is still running +pub async fn worker_is_active(upid: &UPID) -> Result { + if is_local_worker(upid) { + return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); + } + + if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { + return Ok(false); + } + + let sock = crate::ctrl_sock_from_pid(upid.pid); + let cmd = json!({ + "command": "worker-task-status", + "args": { + "upid": upid.to_string(), + }, + }); + let status = crate::send_command(sock, &cmd).await?; + + if let Some(active) = status.as_bool() { + Ok(active) + } else { + bail!("got unexpected result {:?} (expected bool)", status); + } +} + +/// Test if the task is still running (fast but inaccurate implementation) +/// +/// If the task is spawned from a different process, we simply return if +/// that process is still running. This information is good enough to detect +/// stale tasks... +pub fn worker_is_active_local(upid: &UPID) -> bool { + if is_local_worker(upid) { + WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) + } else { + procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() + } +} + +/// Register task control command on a [CommandSocket]. +/// +/// This create two commands: +/// +/// * ``worker-task-abort ``: calls [abort_local_worker] +/// +/// * ``worker-task-status ``: return true of false, depending on +/// whether the worker is running or stopped. +pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> { + fn get_upid(args: Option<&Value>) -> Result { + let args = if let Some(args) = args { + args + } else { + bail!("missing args") + }; + let upid = match args.get("upid") { + Some(Value::String(upid)) => upid.parse::()?, + None => bail!("no upid in args"), + _ => bail!("unable to parse upid"), + }; + if !is_local_worker(&upid) { + bail!("upid does not belong to this process"); + } + Ok(upid) + } + + commando_sock.register_command("worker-task-abort".into(), move |args| { + let upid = get_upid(args)?; + + abort_local_worker(upid); + + Ok(Value::Null) + })?; + commando_sock.register_command("worker-task-status".into(), move |args| { + let upid = get_upid(args)?; + + let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); + + Ok(active.into()) + })?; + + Ok(()) +} + +/// Try to abort a worker task, but do no wait +/// +/// Errors (if any) are simply logged. +pub fn abort_worker_nowait(upid: UPID) { + tokio::spawn(async move { + if let Err(err) = abort_worker(upid).await { + log::error!("abort worker task failed - {}", err); + } + }); +} + +/// Abort a worker task +/// +/// By sending ``worker-task-abort`` to the control socket. +pub async fn abort_worker(upid: UPID) -> Result<(), Error> { + let sock = crate::ctrl_sock_from_pid(upid.pid); + let cmd = json!({ + "command": "worker-task-abort", + "args": { + "upid": upid.to_string(), + }, + }); + crate::send_command(sock, &cmd).map_ok(|_| ()).await +} + +fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { + let data = line.splitn(3, ' ').collect::>(); + + let len = data.len(); + + match len { + 1 => Ok((data[0].to_owned(), data[0].parse::()?, None)), + 3 => { + let endtime = i64::from_str_radix(data[1], 16)?; + let state = TaskState::from_endtime_and_message(endtime, data[2])?; + Ok((data[0].to_owned(), data[0].parse::()?, Some(state))) + } + _ => bail!("wrong number of components"), + } +} + +/// Task State +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum TaskState { + /// The Task ended with an undefined state + Unknown { endtime: i64 }, + /// The Task ended and there were no errors or warnings + OK { endtime: i64 }, + /// The Task had 'count' amount of warnings and no errors + Warning { count: u64, endtime: i64 }, + /// The Task ended with the error described in 'message' + Error { message: String, endtime: i64 }, +} + +impl TaskState { + pub fn endtime(&self) -> i64 { + match *self { + TaskState::Unknown { endtime } => endtime, + TaskState::OK { endtime } => endtime, + TaskState::Warning { endtime, .. } => endtime, + TaskState::Error { endtime, .. } => endtime, + } + } + + fn result_text(&self) -> String { + match self { + TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), + other => format!("TASK {}", other), + } + } + + fn from_endtime_and_message(endtime: i64, s: &str) -> Result { + if s == "unknown" { + Ok(TaskState::Unknown { endtime }) + } else if s == "OK" { + Ok(TaskState::OK { endtime }) + } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { + let count: u64 = warnings.parse()?; + Ok(TaskState::Warning { count, endtime }) + } else if !s.is_empty() { + let message = if let Some(err) = s.strip_prefix("ERROR: ") { + err + } else { + s + } + .to_string(); + Ok(TaskState::Error { message, endtime }) + } else { + bail!("unable to parse Task Status '{}'", s); + } + } +} + +impl std::cmp::PartialOrd for TaskState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.endtime().cmp(&other.endtime())) + } +} + +impl std::cmp::Ord for TaskState { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.endtime().cmp(&other.endtime()) + } +} + +impl std::fmt::Display for TaskState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskState::Unknown { .. } => write!(f, "unknown"), + TaskState::OK { .. } => write!(f, "OK"), + TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), + TaskState::Error { message, .. } => write!(f, "{}", message), + } + } +} + +/// Task details including parsed UPID +/// +/// If there is no `state`, the task is still running. +#[derive(Debug)] +pub struct TaskListInfo { + /// The parsed UPID + pub upid: UPID, + /// UPID string representation + pub upid_str: String, + /// Task `(endtime, status)` if already finished + pub state: Option, // endtime, status +} + +fn render_task_line(info: &TaskListInfo) -> String { + let mut raw = String::new(); + if let Some(status) = &info.state { + use std::fmt::Write as _; + + let _ = writeln!(raw, "{} {:08X} {}", info.upid_str, status.endtime(), status); + } else { + raw.push_str(&info.upid_str); + raw.push('\n'); + } + + raw +} + +fn render_task_list(list: &[TaskListInfo]) -> String { + let mut raw = String::new(); + for info in list { + raw.push_str(&render_task_line(info)); + } + raw +} + +// note this is not locked, caller has to make sure it is +// this will skip (and log) lines that are not valid status lines +fn read_task_file(reader: R) -> Result, Error> { + let reader = BufReader::new(reader); + let mut list = Vec::new(); + for line in reader.lines() { + let line = line?; + match parse_worker_status_line(&line) { + Ok((upid_str, upid, state)) => list.push(TaskListInfo { + upid_str, + upid, + state, + }), + Err(err) => { + log::warn!("unable to parse worker status '{}' - {}", line, err); + continue; + } + }; + } + + Ok(list) +} + +// note this is not locked, caller has to make sure it is +fn read_task_file_from_path

(path: P) -> Result, Error> +where + P: AsRef + std::fmt::Debug, +{ + let file = match File::open(&path) { + Ok(f) => f, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), + Err(err) => bail!("unable to open task list {:?} - {}", path, err), + }; + + read_task_file(file) +} + +/// Iterate over existing/active worker tasks +pub struct TaskListInfoIterator { + list: VecDeque, + end: bool, + archive: Option, + lock: Option, +} + +impl TaskListInfoIterator { + /// Creates a new iterator instance. + pub fn new(active_only: bool) -> Result { + let setup = worker_task_setup()?; + + let (read_lock, active_list) = { + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; + + let needs_update = active_list + .iter() + .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); + + // TODO remove with 1.x + let index_exists = setup.task_index_fn.is_file(); + + if needs_update || index_exists { + drop(lock); + setup.update_active_workers(None)?; + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; + (lock, active_list) + } else { + (lock, active_list) + } + }; + + let archive = if active_only { + None + } else { + let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?; + Some(logrotate.files()) + }; + + let lock = if active_only { None } else { Some(read_lock) }; + + Ok(Self { + list: active_list.into(), + end: active_only, + archive, + lock, + }) + } +} + +impl Iterator for TaskListInfoIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + if let Some(element) = self.list.pop_back() { + return Some(Ok(element)); + } else if self.end { + return None; + } else { + if let Some(mut archive) = self.archive.take() { + if let Some(file) = archive.next() { + let list = match read_task_file(file) { + Ok(list) => list, + Err(err) => return Some(Err(err)), + }; + self.list.append(&mut list.into()); + self.archive = Some(archive); + continue; + } + } + + self.end = true; + self.lock.take(); + } + } + } +} + +/// Launch long running worker tasks. +/// +/// A worker task can either be a whole thread, or a simply tokio +/// task/future. Each task can `log()` messages, which are stored +/// persistently to files. Task should poll the `abort_requested` +/// flag, and stop execution when requested. +pub struct WorkerTask { + setup: &'static WorkerTaskSetup, + upid: UPID, + data: Mutex, + abort_requested: AtomicBool, +} + +impl std::fmt::Display for WorkerTask { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.upid.fmt(f) + } +} + +struct WorkerTaskData { + logger: FileLogger, + progress: f64, // 0..1 + warn_count: u64, + pub abort_listeners: Vec>, +} + +impl WorkerTask { + pub fn new( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + ) -> Result, Error> { + let setup = worker_task_setup()?; + + let upid = UPID::new(worker_type, worker_id, auth_id)?; + let task_id = upid.task_id; + + let mut path = setup.taskdir.clone(); + + path.push(format!("{:02X}", upid.pstart & 255)); + + let dir_opts = setup + .file_opts + .clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); + + create_path(&path, None, Some(dir_opts))?; + + path.push(upid.to_string()); + + let logger_options = FileLogOptions { + to_stdout, + exclusive: true, + prefix_time: true, + read: true, + file_opts: setup.file_opts.clone(), + ..Default::default() + }; + let logger = FileLogger::new(&path, logger_options)?; + + let worker = Arc::new(Self { + setup, + upid: upid.clone(), + abort_requested: AtomicBool::new(false), + data: Mutex::new(WorkerTaskData { + logger, + progress: 0.0, + warn_count: 0, + abort_listeners: vec![], + }), + }); + + // scope to drop the lock again after inserting + { + let mut hash = WORKER_TASK_LIST.lock().unwrap(); + hash.insert(task_id, worker.clone()); + crate::set_worker_count(hash.len()); + } + + setup.update_active_workers(Some(&upid))?; + + Ok(worker) + } + + /// Spawn a new tokio task/future. + pub fn spawn( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + f: F, + ) -> Result + where + F: Send + 'static + FnOnce(Arc) -> T, + T: Send + 'static + Future>, + { + let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; + let upid_str = worker.upid.to_string(); + let f = f(worker.clone()); + tokio::spawn(async move { + let result = f.await; + worker.log_result(&result); + }); + + Ok(upid_str) + } + + /// Create a new worker thread. + pub fn new_thread( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + f: F, + ) -> Result + where + F: Send + UnwindSafe + 'static + FnOnce(Arc) -> Result<(), Error>, + { + let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; + let upid_str = worker.upid.to_string(); + + let _child = std::thread::Builder::new() + .name(upid_str.clone()) + .spawn(move || { + let worker1 = worker.clone(); + let result = match std::panic::catch_unwind(move || f(worker1)) { + Ok(r) => r, + Err(panic) => match panic.downcast::<&str>() { + Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)), + Err(_) => Err(format_err!("worker panicked: unknown type.")), + }, + }; + + worker.log_result(&result); + }); + + Ok(upid_str) + } + + /// create state from self and a result + pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { + let warn_count = self.data.lock().unwrap().warn_count; + + let endtime = proxmox_time::epoch_i64(); + + if let Err(err) = result { + TaskState::Error { + message: err.to_string(), + endtime, + } + } else if warn_count > 0 { + TaskState::Warning { + count: warn_count, + endtime, + } + } else { + TaskState::OK { endtime } + } + } + + /// Log task result, remove task from running list + pub fn log_result(&self, result: &Result<(), Error>) { + let state = self.create_state(result); + self.log_message(state.result_text()); + + WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); + let _ = self.setup.update_active_workers(None); + crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); + } + + /// Log a message. + pub fn log_message>(&self, msg: S) { + let mut data = self.data.lock().unwrap(); + data.logger.log(msg); + } + + /// Log a message as warning. + pub fn log_warning>(&self, msg: S) { + let mut data = self.data.lock().unwrap(); + data.logger.log(format!("WARN: {}", msg.as_ref())); + data.warn_count += 1; + } + + /// Set progress indicator + pub fn progress(&self, progress: f64) { + if (0.0..=1.0).contains(&progress) { + let mut data = self.data.lock().unwrap(); + data.progress = progress; + } else { + // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); + } + } + + /// Request abort + pub fn request_abort(&self) { + let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); + if !prev_abort { + self.log_message("received abort request ..."); // log abort only once + } + // noitify listeners + let mut data = self.data.lock().unwrap(); + loop { + match data.abort_listeners.pop() { + None => { + break; + } + Some(ch) => { + let _ = ch.send(()); // ignore errors here + } + } + } + } + + /// Get a future which resolves on task abort + pub fn abort_future(&self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + + let mut data = self.data.lock().unwrap(); + if self.abort_requested() { + let _ = tx.send(()); + } else { + data.abort_listeners.push(tx); + } + rx + } + + pub fn upid(&self) -> &UPID { + &self.upid + } +} + +impl WorkerTaskContext for WorkerTask { + fn abort_requested(&self) -> bool { + self.abort_requested.load(Ordering::SeqCst) + } + + fn shutdown_requested(&self) -> bool { + crate::shutdown_requested() + } + + fn fail_on_shutdown(&self) -> Result<(), Error> { + crate::fail_on_shutdown() + } + + fn log(&self, level: log::Level, message: &std::fmt::Arguments) { + match level { + log::Level::Error => self.log_warning(&message.to_string()), + log::Level::Warn => self.log_warning(&message.to_string()), + log::Level::Info => self.log_message(&message.to_string()), + log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), + log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), + } + } +} + +/// Wait for a locally spanned worker task +/// +/// Note: local workers should print logs to stdout, so there is no +/// need to fetch/display logs. We just wait for the worker to finish. +pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { + let upid: UPID = upid_str.parse()?; + + let sleep_duration = core::time::Duration::new(0, 100_000_000); + + loop { + if worker_is_active_local(&upid) { + tokio::time::sleep(sleep_duration).await; + } else { + break; + } + } + Ok(()) +} + +/// Request abort of a local worker (if existing and running) +pub fn abort_local_worker(upid: UPID) { + if let Some(worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { + worker.request_abort(); + } +}