From 938dd9278e34b807d54c8821e9d0e564dec35ea4 Mon Sep 17 00:00:00 2001 From: Thomas Lamprecht Date: Tue, 11 Oct 2022 15:27:47 +0200 Subject: [PATCH] move proxmox-rest-server to proxmox-rs as separate package Lives now in the common proxmox rs repo[0] for better reuse with other projects. [0]: https://git.proxmox.com/?p=proxmox.git;a=tree;f=proxmox-rest-server;h=8035b65a00271604c229590d0109aba3f75ee784 Signed-off-by: Thomas Lamprecht --- Cargo.toml | 4 +- Makefile | 1 - debian/control | 1 + proxmox-rest-server/Cargo.toml | 43 - .../examples/minimal-rest-server.rs | 232 ---- proxmox-rest-server/src/api_config.rs | 287 ----- proxmox-rest-server/src/command_socket.rs | 241 ---- proxmox-rest-server/src/compression.rs | 39 - proxmox-rest-server/src/daemon.rs | 379 ------ proxmox-rest-server/src/environment.rs | 98 -- proxmox-rest-server/src/file_logger.rs | 147 --- proxmox-rest-server/src/formatter.rs | 235 ---- proxmox-rest-server/src/h2service.rs | 147 --- proxmox-rest-server/src/lib.rs | 244 ---- proxmox-rest-server/src/rest.rs | 789 ------------ proxmox-rest-server/src/state.rs | 161 --- proxmox-rest-server/src/worker_task.rs | 1058 ----------------- proxmox-restore-daemon/Cargo.toml | 2 +- 18 files changed, 4 insertions(+), 4104 deletions(-) delete mode 100644 proxmox-rest-server/Cargo.toml delete mode 100644 proxmox-rest-server/examples/minimal-rest-server.rs delete mode 100644 proxmox-rest-server/src/api_config.rs delete mode 100644 proxmox-rest-server/src/command_socket.rs delete mode 100644 proxmox-rest-server/src/compression.rs delete mode 100644 proxmox-rest-server/src/daemon.rs delete mode 100644 proxmox-rest-server/src/environment.rs delete mode 100644 proxmox-rest-server/src/file_logger.rs delete mode 100644 proxmox-rest-server/src/formatter.rs delete mode 100644 proxmox-rest-server/src/h2service.rs delete mode 100644 proxmox-rest-server/src/lib.rs delete mode 100644 proxmox-rest-server/src/rest.rs delete mode 100644 proxmox-rest-server/src/state.rs delete mode 100644 proxmox-rest-server/src/worker_task.rs diff --git a/Cargo.toml b/Cargo.toml index aa301a96..5fa0d486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "pbs-config", "pbs-datastore", "pbs-fuse-loop", - "proxmox-rest-server", "proxmox-rrd", "pbs-tape", "pbs-tools", @@ -108,6 +107,7 @@ proxmox-shared-memory = "0.2" proxmox-subscription = { version = "0.3", features = [ "api-types" ] } proxmox-sys = { version = "0.4", features = [ "sortable-macro", "timer" ] } proxmox-compression = "0.1" +proxmox-rest-server = "0.2" proxmox-acme-rs = "0.4" @@ -120,7 +120,6 @@ pbs-buildcfg = { path = "pbs-buildcfg" } pbs-client = { path = "pbs-client" } pbs-config = { path = "pbs-config" } pbs-datastore = { path = "pbs-datastore" } -proxmox-rest-server = { path = "proxmox-rest-server" } proxmox-rrd = { path = "proxmox-rrd" } pbs-tools = { path = "pbs-tools" } pbs-tape = { path = "pbs-tape" } @@ -138,6 +137,7 @@ pbs-tape = { path = "pbs-tape" } #proxmox-io = { path = "../proxmox/proxmox-io" } #proxmox-lang = { path = "../proxmox/proxmox-lang" } #proxmox-openid = { path = "../proxmox-openid-rs" } +#proxmox-router = { path = "../proxmox/proxmox-rest-server" } #proxmox-router = { path = "../proxmox/proxmox-router" } #proxmox-schema = { path = "../proxmox/proxmox-schema" } #proxmox-section-config = { path = "../proxmox/proxmox-section-config" } diff --git a/Makefile b/Makefile index a29cb2df..87b5ca01 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,6 @@ SUBCRATES := \ pbs-config \ pbs-datastore \ pbs-fuse-loop \ - proxmox-rest-server \ proxmox-rrd \ pbs-tape \ pbs-tools \ diff --git a/debian/control b/debian/control index 30f5d1a5..fd835cc0 100644 --- a/debian/control +++ b/debian/control @@ -58,6 +58,7 @@ Build-Depends: debhelper (>= 12), librust-proxmox-lang-1+default-dev (>= 1.1-~~), librust-proxmox-metrics-0.2+default-dev, librust-proxmox-openid-0.9+default-dev, + librust-proxmox-rest-server-dev, librust-proxmox-router-1+cli-dev (>= 1.3.0-~~), librust-proxmox-router-1+default-dev (>= 1.3.0-~~), librust-proxmox-router-1+server-dev (>= 1.3.0-~~), diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml deleted file mode 100644 index cfd1af9b..00000000 --- a/proxmox-rest-server/Cargo.toml +++ /dev/null @@ -1,43 +0,0 @@ -[package] -name = "proxmox-rest-server" -version = "0.1.0" -authors = ["Proxmox Support Team "] -edition = "2018" -description = "REST server implementation" - -# for example -[dev-dependencies] -proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] } -tokio = { version = "1.6", features = [ "rt-multi-thread", "signal", "process" ] } - -[dependencies] -anyhow = "1.0" -futures = "0.3" -handlebars = "3.0" -http = "0.2" -hyper = { version = "0.14.5", features = [ "full" ] } -lazy_static = "1.4" -libc = "0.2" -log = "0.4.17" -nix = "0.24" -once_cell = "1.3.1" -percent-encoding = "2.1" -regex = "1.5" -serde = { version = "1.0", features = [ "derive" ] } -serde_json = "1.0" -tokio = { version = "1.6", features = ["signal", "process"] } -tokio-openssl = "0.6.1" -tokio-stream = "0.1.0" -tower-service = "0.3.0" -url = "2.1" - -#proxmox = "0.15.3" -proxmox-async = "0.4" -proxmox-compression = "0.1.1" -proxmox-io = "1" -proxmox-lang = "1.1" -proxmox-http = { version = "0.7", features = [ "client" ] } -proxmox-router = "1.3.0" -proxmox-schema = { version = "1.3.1", features = [ "api-macro", "upid-api-impl" ] } -proxmox-time = "1" -proxmox-sys = { version = "0.4", features = [ "logrotate", "timer" ] } diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs deleted file mode 100644 index 91cb9738..00000000 --- a/proxmox-rest-server/examples/minimal-rest-server.rs +++ /dev/null @@ -1,232 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -use std::sync::Mutex; - -use anyhow::{bail, format_err, Error}; -use http::request::Parts; -use http::HeaderMap; -use hyper::{Body, Method, Response}; -use lazy_static::lazy_static; - -use proxmox_router::{ - list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation, -}; -use proxmox_schema::api; - -use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter}; - -// Create a Dummy User information system -struct DummyUserInfo; - -impl UserInformation for DummyUserInfo { - fn is_superuser(&self, _userid: &str) -> bool { - // Always return true here, so we have access to everything - true - } - fn is_group_member(&self, _userid: &str, group: &str) -> bool { - group == "Group" - } - fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { - u64::MAX - } -} - -struct MinimalServer; - -// implement the server adapter -impl ServerAdapter for MinimalServer { - // normally this would check and authenticate the user - fn check_auth( - &self, - _headers: &HeaderMap, - _method: &Method, - ) -> Pin< - Box< - dyn Future), AuthError>> - + Send, - >, - > { - Box::pin(async move { - // get some global/cached userinfo - let userinfo: Box = Box::new(DummyUserInfo); - // Do some user checks, e.g. cookie/csrf - Ok(("User".to_string(), userinfo)) - }) - } - - // this should return the index page of the webserver, iow. what the user browses to - fn get_index( - &self, - _env: RestEnvironment, - _parts: Parts, - ) -> Pin> + Send>> { - Box::pin(async move { - // build an index page - http::Response::builder() - .body("hello world".into()) - .unwrap() - }) - } -} - -// a few examples on how to do api calls with the Router - -#[api] -/// A simple ping method. returns "pong" -fn ping() -> Result { - Ok("pong".to_string()) -} - -lazy_static! { - static ref ITEM_MAP: Mutex> = Mutex::new(HashMap::new()); -} - -#[api] -/// Lists all current items -fn list_items() -> Result, Error> { - Ok(ITEM_MAP.lock().unwrap().keys().cloned().collect()) -} - -#[api( - input: { - properties: { - name: { - type: String, - description: "The name", - }, - value: { - type: String, - description: "The value", - }, - }, - }, -)] -/// creates a new item -fn create_item(name: String, value: String) -> Result<(), Error> { - let mut map = ITEM_MAP.lock().unwrap(); - if map.contains_key(&name) { - bail!("{} already exists", name); - } - - map.insert(name, value); - - Ok(()) -} - -#[api( - input: { - properties: { - name: { - type: String, - description: "The name", - }, - }, - }, -)] -/// returns the value of an item -fn get_item(name: String) -> Result { - ITEM_MAP - .lock() - .unwrap() - .get(&name) - .map(|s| s.to_string()) - .ok_or_else(|| format_err!("no such item '{}'", name)) -} - -#[api( - input: { - properties: { - name: { - type: String, - description: "The name", - }, - value: { - type: String, - description: "The value", - }, - }, - }, -)] -/// updates an item -fn update_item(name: String, value: String) -> Result<(), Error> { - if let Some(val) = ITEM_MAP.lock().unwrap().get_mut(&name) { - *val = value; - } else { - bail!("no such item '{}'", name); - } - Ok(()) -} - -#[api( - input: { - properties: { - name: { - type: String, - description: "The name", - }, - }, - }, -)] -/// deletes an item -fn delete_item(name: String) -> Result<(), Error> { - if ITEM_MAP.lock().unwrap().remove(&name).is_none() { - bail!("no such item '{}'", name); - } - Ok(()) -} - -const ITEM_ROUTER: Router = Router::new() - .get(&API_METHOD_GET_ITEM) - .put(&API_METHOD_UPDATE_ITEM) - .delete(&API_METHOD_DELETE_ITEM); - -const SUBDIRS: SubdirMap = &[ - ( - "items", - &Router::new() - .get(&API_METHOD_LIST_ITEMS) - .post(&API_METHOD_CREATE_ITEM) - .match_all("name", &ITEM_ROUTER), - ), - ("ping", &Router::new().get(&API_METHOD_PING)), -]; - -const ROUTER: Router = Router::new() - .get(&list_subdirs_api_method!(SUBDIRS)) - .subdirs(SUBDIRS); - -async fn run() -> Result<(), Error> { - // we first have to configure the api environment (basedir etc.) - - let config = ApiConfig::new( - "/var/tmp/", - &ROUTER, - RpcEnvironmentType::PUBLIC, - MinimalServer, - )?; - let rest_server = RestServer::new(config); - - // then we have to create a daemon that listens, accepts and serves the api to clients - proxmox_rest_server::daemon::create_daemon( - ([127, 0, 0, 1], 65000).into(), - move |listener| { - let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; - - Ok(async move { - hyper::Server::builder(incoming).serve(rest_server).await?; - - Ok(()) - }) - }, - None, - ) - .await?; - - Ok(()) -} - -fn main() -> Result<(), Error> { - let rt = tokio::runtime::Runtime::new()?; - rt.block_on(async { run().await }) -} diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs deleted file mode 100644 index b05e06d0..00000000 --- a/proxmox-rest-server/src/api_config.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::collections::HashMap; -use std::fs::metadata; -use std::path::PathBuf; -use std::pin::Pin; -use std::sync::{Arc, Mutex, RwLock}; -use std::time::SystemTime; - -use anyhow::{bail, format_err, Error}; -use hyper::http::request::Parts; -use hyper::{Body, Method, Response}; - -use handlebars::Handlebars; -use serde::Serialize; - -use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation}; -use proxmox_sys::fs::{create_path, CreateOptions}; - -use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter}; - -/// REST server configuration -pub struct ApiConfig { - basedir: PathBuf, - router: &'static Router, - aliases: HashMap, - env_type: RpcEnvironmentType, - templates: RwLock>, - template_files: RwLock>, - request_log: Option>>, - auth_log: Option>>, - adapter: Pin>, -} - -impl ApiConfig { - /// Creates a new instance - /// - /// `basedir` - File lookups are relative to this directory. - /// - /// `router` - The REST API definition. - /// - /// `env_type` - The environment type. - /// - /// `api_auth` - The Authentication handler - /// - /// `get_index_fn` - callback to generate the root page - /// (index). Please note that this functions gets a reference to - /// the [ApiConfig], so it can use [Handlebars] templates - /// ([render_template](Self::render_template) to generate pages. - pub fn new>( - basedir: B, - router: &'static Router, - env_type: RpcEnvironmentType, - adapter: impl ServerAdapter + 'static, - ) -> Result { - Ok(Self { - basedir: basedir.into(), - router, - aliases: HashMap::new(), - env_type, - templates: RwLock::new(Handlebars::new()), - template_files: RwLock::new(HashMap::new()), - request_log: None, - auth_log: None, - adapter: Box::pin(adapter), - }) - } - - pub(crate) async fn get_index( - &self, - rest_env: RestEnvironment, - parts: Parts, - ) -> Response { - self.adapter.get_index(rest_env, parts).await - } - - pub(crate) async fn check_auth( - &self, - headers: &http::HeaderMap, - method: &hyper::Method, - ) -> Result<(String, Box), AuthError> { - self.adapter.check_auth(headers, method).await - } - - pub(crate) fn find_method( - &self, - components: &[&str], - method: Method, - uri_param: &mut HashMap, - ) -> Option<&'static ApiMethod> { - self.router.find_method(components, method, uri_param) - } - - pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf { - let mut prefix = String::new(); - let mut filename = self.basedir.clone(); - let comp_len = components.len(); - if comp_len >= 1 { - prefix.push_str(components[0]); - if let Some(subdir) = self.aliases.get(&prefix) { - filename.push(subdir); - components - .iter() - .skip(1) - .for_each(|comp| filename.push(comp)); - } else { - components.iter().for_each(|comp| filename.push(comp)); - } - } - filename - } - - /// Register a path alias - /// - /// This can be used to redirect file lookups to a specific - /// directory, e.g.: - /// - /// ``` - /// use proxmox_rest_server::ApiConfig; - /// // let mut config = ApiConfig::new(...); - /// # fn fake(config: &mut ApiConfig) { - /// config.add_alias("extjs", "/usr/share/javascript/extjs"); - /// # } - /// ``` - pub fn add_alias(&mut self, alias: S, path: P) - where - S: Into, - P: Into, - { - self.aliases.insert(alias.into(), path.into()); - } - - pub(crate) fn env_type(&self) -> RpcEnvironmentType { - self.env_type - } - - /// Register a [Handlebars] template file - /// - /// Those templates cane be use with [render_template](Self::render_template) to generate pages. - pub fn register_template

(&self, name: &str, path: P) -> Result<(), Error> - where - P: Into, - { - if self.template_files.read().unwrap().contains_key(name) { - bail!("template already registered"); - } - - let path: PathBuf = path.into(); - let metadata = metadata(&path)?; - let mtime = metadata.modified()?; - - self.templates - .write() - .unwrap() - .register_template_file(name, &path)?; - self.template_files - .write() - .unwrap() - .insert(name.to_string(), (mtime, path)); - - Ok(()) - } - - /// Checks if the template was modified since the last rendering - /// if yes, it loads a the new version of the template - pub fn render_template(&self, name: &str, data: &T) -> Result - where - T: Serialize, - { - let path; - let mtime; - { - let template_files = self.template_files.read().unwrap(); - let (old_mtime, old_path) = template_files - .get(name) - .ok_or_else(|| format_err!("template not found"))?; - - mtime = metadata(old_path)?.modified()?; - if mtime <= *old_mtime { - return self - .templates - .read() - .unwrap() - .render(name, data) - .map_err(|err| format_err!("{}", err)); - } - path = old_path.to_path_buf(); - } - - { - let mut template_files = self.template_files.write().unwrap(); - let mut templates = self.templates.write().unwrap(); - - templates.register_template_file(name, &path)?; - template_files.insert(name.to_string(), (mtime, path)); - - templates - .render(name, data) - .map_err(|err| format_err!("{}", err)) - } - } - - /// Enable the access log feature - /// - /// When enabled, all requests are logged to the specified file. - /// This function also registers a `api-access-log-reopen` - /// command one the [CommandSocket]. - pub fn enable_access_log

( - &mut self, - path: P, - dir_opts: Option, - file_opts: Option, - commando_sock: &mut CommandSocket, - ) -> Result<(), Error> - where - P: Into, - { - let path: PathBuf = path.into(); - if let Some(base) = path.parent() { - if !base.exists() { - create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?; - } - } - - let logger_options = FileLogOptions { - append: true, - file_opts: file_opts.unwrap_or_default(), - ..Default::default() - }; - let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?)); - self.request_log = Some(Arc::clone(&request_log)); - - commando_sock.register_command("api-access-log-reopen".into(), move |_args| { - log::info!("re-opening access-log file"); - request_log.lock().unwrap().reopen()?; - Ok(serde_json::Value::Null) - })?; - - Ok(()) - } - - /// Enable the authentication log feature - /// - /// When enabled, all authentication requests are logged to the - /// specified file. This function also registers a - /// `api-auth-log-reopen` command one the [CommandSocket]. - pub fn enable_auth_log

( - &mut self, - path: P, - dir_opts: Option, - file_opts: Option, - commando_sock: &mut CommandSocket, - ) -> Result<(), Error> - where - P: Into, - { - let path: PathBuf = path.into(); - if let Some(base) = path.parent() { - if !base.exists() { - create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?; - } - } - - let logger_options = FileLogOptions { - append: true, - prefix_time: true, - file_opts: file_opts.unwrap_or_default(), - ..Default::default() - }; - let auth_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?)); - self.auth_log = Some(Arc::clone(&auth_log)); - - commando_sock.register_command("api-auth-log-reopen".into(), move |_args| { - log::info!("re-opening auth-log file"); - auth_log.lock().unwrap().reopen()?; - Ok(serde_json::Value::Null) - })?; - - Ok(()) - } - - pub(crate) fn get_access_log(&self) -> Option<&Arc>> { - self.request_log.as_ref() - } - - pub(crate) fn get_auth_log(&self) -> Option<&Arc>> { - self.auth_log.as_ref() - } -} diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-rest-server/src/command_socket.rs deleted file mode 100644 index bfa42b01..00000000 --- a/proxmox-rest-server/src/command_socket.rs +++ /dev/null @@ -1,241 +0,0 @@ -use anyhow::{bail, format_err, Error}; - -use std::collections::HashMap; -use std::os::unix::io::AsRawFd; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use futures::*; -use nix::sys::socket; -use nix::unistd::Gid; -use serde::Serialize; -use serde_json::Value; -use tokio::net::UnixListener; - -// Listens on a Unix Socket to handle simple command asynchronously -fn create_control_socket( - path: P, - gid: Gid, - func: F, -) -> Result, Error> -where - P: Into, - F: Fn(Value) -> Result + Send + Sync + 'static, -{ - let path: PathBuf = path.into(); - - let gid = gid.as_raw(); - - let socket = UnixListener::bind(&path)?; - - let func = Arc::new(func); - - let control_future = async move { - loop { - let (conn, _addr) = match socket.accept().await { - Ok(data) => data, - Err(err) => { - log::error!("failed to accept on control socket {:?}: {}", path, err); - continue; - } - }; - - let opt = socket::sockopt::PeerCredentials {}; - let cred = match socket::getsockopt(conn.as_raw_fd(), opt) { - Ok(cred) => cred, - Err(err) => { - log::error!("no permissions - unable to read peer credential - {}", err); - continue; - } - }; - - // check permissions (same gid, root user, or backup group) - let mygid = unsafe { libc::getgid() }; - if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) { - log::error!("no permissions for {:?}", cred); - continue; - } - - let (rx, mut tx) = tokio::io::split(conn); - - let abort_future = super::last_worker_future().map(|_| ()); - - use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; - let func = Arc::clone(&func); - let path = path.clone(); - tokio::spawn( - futures::future::select( - async move { - let mut rx = tokio::io::BufReader::new(rx); - let mut line = String::new(); - loop { - line.clear(); - match rx - .read_line({ - line.clear(); - &mut line - }) - .await - { - Ok(0) => break, - Ok(_) => (), - Err(err) => { - log::error!("control socket {:?} read error: {}", path, err); - return; - } - } - - let response = match line.parse::() { - Ok(param) => match func(param) { - Ok(res) => format!("OK: {}\n", res), - Err(err) => format!("ERROR: {}\n", err), - }, - Err(err) => format!("ERROR: {}\n", err), - }; - - if let Err(err) = tx.write_all(response.as_bytes()).await { - log::error!( - "control socket {:?} write response error: {}", - path, - err - ); - return; - } - } - } - .boxed(), - abort_future, - ) - .map(|_| ()), - ); - } - } - .boxed(); - - let abort_future = crate::last_worker_future().map_err(|_| {}); - let task = futures::future::select(control_future, abort_future) - .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); - - Ok(task) -} - -/// Send a command to the specified socket -pub async fn send_command(path: P, params: &T) -> Result -where - P: AsRef, - T: ?Sized + Serialize, -{ - let mut command_string = serde_json::to_string(params)?; - command_string.push('\n'); - send_raw_command(path.as_ref(), &command_string).await -} - -/// Send a raw command (string) to the specified socket -pub async fn send_raw_command

(path: P, command_string: &str) -> Result -where - P: AsRef, -{ - use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; - - let mut conn = tokio::net::UnixStream::connect(path) - .map_err(move |err| format_err!("control socket connect failed - {}", err)) - .await?; - - conn.write_all(command_string.as_bytes()).await?; - if !command_string.as_bytes().ends_with(b"\n") { - conn.write_all(b"\n").await?; - } - - AsyncWriteExt::shutdown(&mut conn).await?; - let mut rx = tokio::io::BufReader::new(conn); - let mut data = String::new(); - if rx.read_line(&mut data).await? == 0 { - bail!("no response"); - } - if let Some(res) = data.strip_prefix("OK: ") { - match res.parse::() { - Ok(v) => Ok(v), - Err(err) => bail!("unable to parse json response - {}", err), - } - } else if let Some(err) = data.strip_prefix("ERROR: ") { - bail!("{}", err); - } else { - bail!("unable to parse response: {}", data); - } -} - -// A callback for a specific commando socket. -type CommandSocketFn = - Box<(dyn Fn(Option<&Value>) -> Result + Send + Sync + 'static)>; - -/// Tooling to get a single control command socket where one can -/// register multiple commands dynamically. -/// -/// The socket is activated by calling [spawn](CommandSocket::spawn), -/// which spawns an async tokio task to process the commands. -pub struct CommandSocket { - socket: PathBuf, - gid: Gid, - commands: HashMap, -} - -impl CommandSocket { - /// Creates a new instance. - pub fn new

(path: P, gid: Gid) -> Self - where - P: Into, - { - CommandSocket { - socket: path.into(), - gid, - commands: HashMap::new(), - } - } - - /// Spawn the socket and consume self, meaning you cannot register commands anymore after - /// calling this. - pub fn spawn(self) -> Result<(), Error> { - let control_future = - create_control_socket(self.socket.to_owned(), self.gid, move |param| { - let param = param.as_object().ok_or_else(|| { - format_err!("unable to parse parameters (expected json object)") - })?; - - let command = match param.get("command") { - Some(Value::String(command)) => command.as_str(), - None => bail!("no command"), - _ => bail!("unable to parse command"), - }; - - if !self.commands.contains_key(command) { - bail!("got unknown command '{}'", command); - } - - match self.commands.get(command) { - None => bail!("got unknown command '{}'", command), - Some(handler) => { - let args = param.get("args"); //.unwrap_or(&Value::Null); - (handler)(args) - } - } - })?; - - tokio::spawn(control_future); - - Ok(()) - } - - /// Register a new command with a callback. - pub fn register_command(&mut self, command: String, handler: F) -> Result<(), Error> - where - F: Fn(Option<&Value>) -> Result + Send + Sync + 'static, - { - if self.commands.contains_key(&command) { - bail!("command '{}' already exists!", command); - } - - self.commands.insert(command, Box::new(handler)); - - Ok(()) - } -} diff --git a/proxmox-rest-server/src/compression.rs b/proxmox-rest-server/src/compression.rs deleted file mode 100644 index 189d7041..00000000 --- a/proxmox-rest-server/src/compression.rs +++ /dev/null @@ -1,39 +0,0 @@ -use anyhow::{bail, Error}; -use hyper::header; - -/// Possible Compression Methods, order determines preference (later is preferred) -#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] -pub enum CompressionMethod { - Deflate, - // Gzip, - // Brotli, -} - -impl CompressionMethod { - pub fn content_encoding(&self) -> header::HeaderValue { - header::HeaderValue::from_static(self.extension()) - } - - pub fn extension(&self) -> &'static str { - match *self { - // CompressionMethod::Brotli => "br", - // CompressionMethod::Gzip => "gzip", - CompressionMethod::Deflate => "deflate", - } - } -} - -impl std::str::FromStr for CompressionMethod { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s { - // "br" => Ok(CompressionMethod::Brotli), - // "gzip" => Ok(CompressionMethod::Gzip), - "deflate" => Ok(CompressionMethod::Deflate), - // http accept-encoding allows to give weights with ';q=' - other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate), - _ => bail!("unknown compression format"), - } - } -} diff --git a/proxmox-rest-server/src/daemon.rs b/proxmox-rest-server/src/daemon.rs deleted file mode 100644 index 4a5806bd..00000000 --- a/proxmox-rest-server/src/daemon.rs +++ /dev/null @@ -1,379 +0,0 @@ -//! Helpers to implement restartable daemons/services. - -use std::ffi::CString; -use std::future::Future; -use std::io::{Read, Write}; -use std::os::raw::{c_char, c_int, c_uchar}; -use std::os::unix::ffi::OsStrExt; -use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::panic::UnwindSafe; -use std::path::PathBuf; - -use anyhow::{bail, format_err, Error}; -use futures::future::{self, Either}; -use nix::unistd::{fork, ForkResult}; - -use proxmox_io::{ReadExt, WriteExt}; -use proxmox_sys::fd::{fd_change_cloexec, Fd}; -use proxmox_sys::fs::CreateOptions; - -// Unfortunately FnBox is nightly-only and Box is unusable, so just use Box... -type BoxedStoreFunc = Box Result + UnwindSafe + Send>; - -// Helper trait to "store" something in the environment to be re-used after re-executing the -// service on a reload. -trait Reloadable: Sized { - fn restore(var: &str) -> Result; - fn get_store_func(&self) -> Result; -} - -// Manages things to be stored and reloaded upon reexec. -// Anything which should be restorable should be instantiated via this struct's `restore` method, -#[derive(Default)] -struct Reloader { - pre_exec: Vec, - self_exe: PathBuf, -} - -// Currently we only need environment variables for storage, but in theory we could also add -// variants which need temporary files or pipes... -struct PreExecEntry { - name: &'static str, // Feel free to change to String if necessary... - store_fn: BoxedStoreFunc, -} - -impl Reloader { - pub fn new() -> Result { - Ok(Self { - pre_exec: Vec::new(), - - // Get the path to our executable as PathBuf - self_exe: std::fs::read_link("/proc/self/exe")?, - }) - } - - /// Restore an object from an environment variable of the given name, or, if none exists, uses - /// the function provided in the `or_create` parameter to instantiate the new "first" instance. - /// - /// Values created via this method will be remembered for later re-execution. - pub async fn restore(&mut self, name: &'static str, or_create: F) -> Result - where - T: Reloadable, - F: FnOnce() -> U, - U: Future>, - { - let res = match std::env::var(name) { - Ok(varstr) => T::restore(&varstr)?, - Err(std::env::VarError::NotPresent) => or_create().await?, - Err(_) => bail!("variable {} has invalid value", name), - }; - - self.pre_exec.push(PreExecEntry { - name, - store_fn: res.get_store_func()?, - }); - Ok(res) - } - - fn pre_exec(self) -> Result<(), Error> { - for mut item in self.pre_exec { - std::env::set_var(item.name, (item.store_fn)()?); - } - Ok(()) - } - - pub fn fork_restart(self, pid_fn: Option<&str>) -> Result<(), Error> { - // Get our parameters as Vec - let args = std::env::args_os(); - let mut new_args = Vec::with_capacity(args.len()); - for arg in args { - new_args.push(CString::new(arg.as_bytes())?); - } - - // Synchronisation pipe: - let (pold, pnew) = super::socketpair()?; - - // Start ourselves in the background: - match unsafe { fork() } { - Ok(ForkResult::Child) => { - // Double fork so systemd can supervise us without nagging... - match unsafe { fork() } { - Ok(ForkResult::Child) => { - std::mem::drop(pold); - // At this point we call pre-exec helpers. We must be certain that if they fail for - // whatever reason we can still call `_exit()`, so use catch_unwind. - match std::panic::catch_unwind(move || { - let mut pnew = - unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) }; - let pid = nix::unistd::Pid::this(); - if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } { - log::error!("failed to send new server PID to parent: {}", e); - unsafe { - libc::_exit(-1); - } - } - - let mut ok = [0u8]; - if let Err(e) = pnew.read_exact(&mut ok) { - log::error!("parent vanished before notifying systemd: {}", e); - unsafe { - libc::_exit(-1); - } - } - assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte"); - - std::mem::drop(pnew); - - // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs - let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()) - .unwrap(); - let ident = ident.as_bytes(); - let fd = - unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }; - if fd >= 0 && fd != 1 { - let fd = proxmox_sys::fd::Fd(fd); // add drop handler - nix::unistd::dup2(fd.as_raw_fd(), 1)?; - } else { - log::error!("failed to update STDOUT journal redirection ({})", fd); - } - let fd = - unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }; - if fd >= 0 && fd != 2 { - let fd = proxmox_sys::fd::Fd(fd); // add drop handler - nix::unistd::dup2(fd.as_raw_fd(), 2)?; - } else { - log::error!("failed to update STDERR journal redirection ({})", fd); - } - - self.do_reexec(new_args) - }) { - Ok(Ok(())) => log::error!("do_reexec returned!"), - Ok(Err(err)) => log::error!("do_reexec failed: {}", err), - Err(_) => log::error!("panic in re-exec"), - } - } - Ok(ForkResult::Parent { child }) => { - std::mem::drop((pold, pnew)); - log::debug!("forked off a new server (second pid: {})", child); - } - Err(e) => log::error!("fork() failed, restart delayed: {}", e), - } - // No matter how we managed to get here, this is the time where we bail out quickly: - unsafe { libc::_exit(-1) } - } - Ok(ForkResult::Parent { child }) => { - log::debug!( - "forked off a new server (first pid: {}), waiting for 2nd pid", - child - ); - std::mem::drop(pnew); - let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) }; - let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } { - Ok(v) => v, - Err(e) => { - log::error!( - "failed to receive pid of double-forked child process: {}", - e - ); - // systemd will complain but won't kill the service... - return Ok(()); - } - }); - - if let Some(pid_fn) = pid_fn { - let pid_str = format!("{}\n", child); - proxmox_sys::fs::replace_file( - pid_fn, - pid_str.as_bytes(), - CreateOptions::new(), - false, - )?; - } - - if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) { - log::error!("failed to notify systemd about the new main pid: {}", e); - } - // ensure systemd got the message about the new main PID before continuing, else it - // will get confused if the new main process sends its READY signal before that - if let Err(e) = systemd_notify_barrier(u64::MAX) { - log::error!("failed to wait on systemd-processing: {}", e); - } - - // notify child that it is now the new main process: - if let Err(e) = pold.write_all(&[1u8]) { - log::error!("child vanished during reload: {}", e); - } - - Ok(()) - } - Err(e) => { - log::error!("fork() failed, restart delayed: {}", e); - Ok(()) - } - } - } - - fn do_reexec(self, args: Vec) -> Result<(), Error> { - let exe = CString::new(self.self_exe.as_os_str().as_bytes())?; - self.pre_exec()?; - nix::unistd::setsid()?; - let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect(); - nix::unistd::execvp(&exe, &args)?; - panic!("exec misbehaved"); - } -} - -// For now all we need to do is store and reuse a tcp listening socket: -impl Reloadable for tokio::net::TcpListener { - // NOTE: The socket must not be closed when the store-function is called: - // FIXME: We could become "independent" of the TcpListener and its reference to the file - // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?) - fn get_store_func(&self) -> Result { - let mut fd_opt = Some(Fd(nix::fcntl::fcntl( - self.as_raw_fd(), - nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), - )?)); - Ok(Box::new(move || { - let fd = fd_opt.take().unwrap(); - fd_change_cloexec(fd.as_raw_fd(), false)?; - Ok(fd.into_raw_fd().to_string()) - })) - } - - fn restore(var: &str) -> Result { - let fd = var - .parse::() - .map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd; - fd_change_cloexec(fd, true)?; - Ok(Self::from_std(unsafe { - std::net::TcpListener::from_raw_fd(fd) - })?) - } -} - -/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. -/// If this is started regularly, a listening socket is created. In this case, the file descriptor -/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. -/// If the variable already exists, its contents will instead be used to restore the listening -/// socket. The finished listening socket is then passed to the `create_service` function which -/// can be used to setup the TLS and the HTTP daemon. The returned future has to call -/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready. -pub async fn create_daemon( - address: std::net::SocketAddr, - create_service: F, - pidfn: Option<&str>, -) -> Result<(), Error> -where - F: FnOnce(tokio::net::TcpListener) -> Result, - S: Future>, -{ - let mut reloader = Reloader::new()?; - - let listener: tokio::net::TcpListener = reloader - .restore("PROXMOX_BACKUP_LISTEN_FD", move || async move { - Ok(tokio::net::TcpListener::bind(&address).await?) - }) - .await?; - - let service = create_service(listener)?; - - let service = async move { - if let Err(err) = service.await { - log::error!("server error: {}", err); - } - }; - - let server_future = Box::pin(service); - let shutdown_future = crate::shutdown_future(); - - let finish_future = match future::select(server_future, shutdown_future).await { - Either::Left((_, _)) => { - if !crate::shutdown_requested() { - crate::request_shutdown(); // make sure we are in shutdown mode - } - None - } - Either::Right((_, server_future)) => Some(server_future), - }; - - let mut reloader = Some(reloader); - - if crate::is_reload_request() { - log::info!("daemon reload..."); - if let Err(e) = systemd_notify(SystemdNotify::Reloading) { - log::error!("failed to notify systemd about the state change: {}", e); - } - if let Err(e) = systemd_notify_barrier(u64::MAX) { - log::error!("failed to wait on systemd-processing: {}", e); - } - - if let Err(e) = reloader.take().unwrap().fork_restart(pidfn) { - log::error!("error during reload: {}", e); - let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string())); - } - } else { - log::info!("daemon shutting down..."); - } - - if let Some(future) = finish_future { - future.await; - } - - log::info!("daemon shut down."); - Ok(()) -} - -#[link(name = "systemd")] -extern "C" { - fn sd_journal_stream_fd( - identifier: *const c_uchar, - priority: c_int, - level_prefix: c_int, - ) -> c_int; - fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int; - fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int; -} - -/// Systemd sercice startup states (see: ``man sd_notify``) -pub enum SystemdNotify { - Ready, - Reloading, - Stopping, - Status(String), - MainPid(nix::unistd::Pid), -} - -/// Tells systemd the startup state of the service (see: ``man sd_notify``) -pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { - let message = match state { - SystemdNotify::Ready => { - log::info!("service is ready"); - CString::new("READY=1") - } - SystemdNotify::Reloading => CString::new("RELOADING=1"), - SystemdNotify::Stopping => CString::new("STOPPING=1"), - SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)), - SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)), - }?; - let rc = unsafe { sd_notify(0, message.as_ptr()) }; - if rc < 0 { - bail!( - "systemd_notify failed: {}", - std::io::Error::from_raw_os_error(-rc) - ); - } - - Ok(()) -} - -/// Waits until all previously sent messages with sd_notify are processed -pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> { - let rc = unsafe { sd_notify_barrier(0, timeout) }; - if rc < 0 { - bail!( - "systemd_notify_barrier failed: {}", - std::io::Error::from_raw_os_error(-rc) - ); - } - Ok(()) -} diff --git a/proxmox-rest-server/src/environment.rs b/proxmox-rest-server/src/environment.rs deleted file mode 100644 index b4dff76b..00000000 --- a/proxmox-rest-server/src/environment.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use serde_json::{json, Value}; - -use proxmox_router::{RpcEnvironment, RpcEnvironmentType}; - -use crate::ApiConfig; - -/// Encapsulates information about the runtime environment -pub struct RestEnvironment { - env_type: RpcEnvironmentType, - result_attributes: Value, - auth_id: Option, - client_ip: Option, - api: Arc, -} - -impl RestEnvironment { - pub fn new(env_type: RpcEnvironmentType, api: Arc) -> Self { - Self { - result_attributes: json!({}), - auth_id: None, - client_ip: None, - env_type, - api, - } - } - - pub fn api_config(&self) -> &ApiConfig { - &self.api - } - - pub fn log_auth(&self, auth_id: &str) { - let msg = format!("successful auth for user '{}'", auth_id); - log::debug!("{}", msg); // avoid noisy syslog, admins can already check the auth log - if let Some(auth_logger) = self.api.get_auth_log() { - auth_logger.lock().unwrap().log(&msg); - } - } - - pub fn log_failed_auth(&self, failed_auth_id: Option, msg: &str) { - let msg = match (self.client_ip, failed_auth_id) { - (Some(peer), Some(user)) => { - format!( - "authentication failure; rhost={} user={} msg={}", - peer, user, msg - ) - } - (Some(peer), None) => { - format!("authentication failure; rhost={} msg={}", peer, msg) - } - (None, Some(user)) => { - format!( - "authentication failure; rhost=unknown user={} msg={}", - user, msg - ) - } - (None, None) => { - format!("authentication failure; rhost=unknown msg={}", msg) - } - }; - log::error!("{}", msg); - if let Some(auth_logger) = self.api.get_auth_log() { - auth_logger.lock().unwrap().log(&msg); - } - } -} - -impl RpcEnvironment for RestEnvironment { - fn result_attrib_mut(&mut self) -> &mut Value { - &mut self.result_attributes - } - - fn result_attrib(&self) -> &Value { - &self.result_attributes - } - - fn env_type(&self) -> RpcEnvironmentType { - self.env_type - } - - fn set_auth_id(&mut self, auth_id: Option) { - self.auth_id = auth_id; - } - - fn get_auth_id(&self) -> Option { - self.auth_id.clone() - } - - fn set_client_ip(&mut self, client_ip: Option) { - self.client_ip = client_ip; - } - - fn get_client_ip(&self) -> Option { - self.client_ip - } -} diff --git a/proxmox-rest-server/src/file_logger.rs b/proxmox-rest-server/src/file_logger.rs deleted file mode 100644 index 2bb1fac6..00000000 --- a/proxmox-rest-server/src/file_logger.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::io::Write; - -use anyhow::Error; -use nix::fcntl::OFlag; - -use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions}; - -/// Options to control the behavior of a [FileLogger] instance -#[derive(Default)] -pub struct FileLogOptions { - /// Open underlying log file in append mode, useful when multiple concurrent processes - /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic - /// for writes smaller than the PIPE_BUF (4k on Linux). - /// Inside the same process you may need to still use an mutex, for shared access. - pub append: bool, - /// Open underlying log file as readable - pub read: bool, - /// If set, ensure that the file is newly created or error out if already existing. - pub exclusive: bool, - /// Duplicate logged messages to STDOUT, like tee - pub to_stdout: bool, - /// Prefix messages logged to the file with the current local time as RFC 3339 - pub prefix_time: bool, - /// File owner/group and mode - pub file_opts: CreateOptions, -} - -/// Log messages with optional automatically added timestamps into files -/// -/// #### Example: -/// ``` -/// # use anyhow::{bail, format_err, Error}; -/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions}; -/// -/// # std::fs::remove_file("test.log"); -/// let options = FileLogOptions { -/// to_stdout: true, -/// exclusive: true, -/// ..Default::default() -/// }; -/// let mut log = FileLogger::new("test.log", options).unwrap(); -/// flog!(log, "A simple log: {}", "Hello!"); -/// # std::fs::remove_file("test.log"); -/// ``` -pub struct FileLogger { - file: std::fs::File, - file_name: std::path::PathBuf, - options: FileLogOptions, -} - -/// Log messages to [FileLogger] - ``println`` like macro -#[macro_export] -macro_rules! flog { - ($log:expr, $($arg:tt)*) => ({ - $log.log(format!($($arg)*)); - }) -} - -impl FileLogger { - pub fn new>( - file_name: P, - options: FileLogOptions, - ) -> Result { - let file = Self::open(&file_name, &options)?; - - let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf(); - - Ok(Self { - file, - file_name, - options, - }) - } - - pub fn reopen(&mut self) -> Result<&Self, Error> { - let file = Self::open(&self.file_name, &self.options)?; - self.file = file; - Ok(self) - } - - fn open>( - file_name: P, - options: &FileLogOptions, - ) -> Result { - let mut flags = OFlag::O_CLOEXEC; - - if options.read { - flags |= OFlag::O_RDWR; - } else { - flags |= OFlag::O_WRONLY; - } - - if options.append { - flags |= OFlag::O_APPEND; - } - if options.exclusive { - flags |= OFlag::O_EXCL; - } - - let file = - atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?; - - Ok(file) - } - - pub fn log>(&mut self, msg: S) { - let msg = msg.as_ref(); - - if self.options.to_stdout { - let mut stdout = std::io::stdout(); - stdout.write_all(msg.as_bytes()).unwrap(); - stdout.write_all(b"\n").unwrap(); - } - - let line = if self.options.prefix_time { - let now = proxmox_time::epoch_i64(); - let rfc3339 = match proxmox_time::epoch_to_rfc3339(now) { - Ok(rfc3339) => rfc3339, - Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen! - }; - format!("{}: {}\n", rfc3339, msg) - } else { - format!("{}\n", msg) - }; - if let Err(err) = self.file.write_all(line.as_bytes()) { - // avoid panicking, log methods should not do that - // FIXME: or, return result??? - log::error!("error writing to log file - {}", err); - } - } -} - -impl std::io::Write for FileLogger { - fn write(&mut self, buf: &[u8]) -> Result { - if self.options.to_stdout { - let _ = std::io::stdout().write(buf); - } - self.file.write(buf) - } - - fn flush(&mut self) -> Result<(), std::io::Error> { - if self.options.to_stdout { - let _ = std::io::stdout().flush(); - } - self.file.flush() - } -} diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs deleted file mode 100644 index 2e9a01fa..00000000 --- a/proxmox-rest-server/src/formatter.rs +++ /dev/null @@ -1,235 +0,0 @@ -//! Helpers to format response data -use std::collections::HashMap; - -use anyhow::Error; -use serde_json::{json, Value}; - -use hyper::header; -use hyper::{Body, Response, StatusCode}; - -use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn}; -use proxmox_schema::ParameterError; - -/// Extension to set error message for server side logging -pub(crate) struct ErrorMessageExtension(pub String); - -/// Methods to format data and errors -pub trait OutputFormatter: Send + Sync { - /// Transform json data into a http response - fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response; - - /// Transform serializable data into a streaming http response - fn format_data_streaming( - &self, - data: Box, - rpcenv: &dyn RpcEnvironment, - ) -> Result, Error>; - - /// Transform errors into a http response - fn format_error(&self, err: Error) -> Response; - - /// Transform a [Result] into a http response - fn format_result( - &self, - result: Result, - rpcenv: &dyn RpcEnvironment, - ) -> Response { - match result { - Ok(data) => self.format_data(data, rpcenv), - Err(err) => self.format_error(err), - } - } -} - -static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8"; - -fn json_data_response(data: Value) -> Response { - let json_str = data.to_string(); - - let raw = json_str.into_bytes(); - - let mut response = Response::new(raw.into()); - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static(JSON_CONTENT_TYPE), - ); - - response -} - -fn json_data_response_streaming(body: Body) -> Result, Error> { - let response = Response::builder() - .header( - header::CONTENT_TYPE, - header::HeaderValue::from_static(JSON_CONTENT_TYPE), - ) - .body(body)?; - Ok(response) -} - -fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { - let attributes = match rpcenv.result_attrib().as_object() { - Some(attr) => attr, - None => return, - }; - - for (key, value) in attributes { - result[key] = value.clone(); - } -} - -fn start_data_streaming( - value: Value, - data: Box, -) -> tokio::sync::mpsc::Receiver, Error>> { - let (writer, reader) = tokio::sync::mpsc::channel(1); - - tokio::task::spawn_blocking(move || { - let output = proxmox_async::blocking::SenderWriter::from_sender(writer); - let mut output = std::io::BufWriter::new(output); - let mut serializer = serde_json::Serializer::new(&mut output); - let _ = data.sender_serialize(&mut serializer, value); - }); - - reader -} - -struct JsonFormatter(); - -/// Format data as ``application/json`` -/// -/// The returned json object contains the following properties: -/// -/// * ``data``: The result data (on success) -/// -/// Any result attributes set on ``rpcenv`` are also added to the object. -/// -/// Errors generates a BAD_REQUEST containing the error -/// message as string. -pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter(); - -impl OutputFormatter for JsonFormatter { - fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { - let mut result = json!({ "data": data }); - - add_result_attributes(&mut result, rpcenv); - - json_data_response(result) - } - - fn format_data_streaming( - &self, - data: Box, - rpcenv: &dyn RpcEnvironment, - ) -> Result, Error> { - let mut value = json!({}); - - add_result_attributes(&mut value, rpcenv); - - let reader = start_data_streaming(value, data); - let stream = tokio_stream::wrappers::ReceiverStream::new(reader); - - json_data_response_streaming(Body::wrap_stream(stream)) - } - - fn format_error(&self, err: Error) -> Response { - let mut response = if let Some(apierr) = err.downcast_ref::() { - let mut resp = Response::new(Body::from(apierr.message.clone())); - *resp.status_mut() = apierr.code; - resp - } else { - let mut resp = Response::new(Body::from(err.to_string())); - *resp.status_mut() = StatusCode::BAD_REQUEST; - resp - }; - - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static(JSON_CONTENT_TYPE), - ); - - response - .extensions_mut() - .insert(ErrorMessageExtension(err.to_string())); - - response - } -} - -/// Format data as ExtJS compatible ``application/json`` -/// -/// The returned json object contains the following properties: -/// -/// * ``success``: boolean attribute indicating the success. -/// -/// * ``data``: The result data (on success) -/// -/// * ``message``: The error message (on failure) -/// -/// * ``errors``: detailed list of errors (if available) -/// -/// Any result attributes set on ``rpcenv`` are also added to the object. -/// -/// Please note that errors return status code OK, but setting success -/// to false. -pub static EXTJS_FORMATTER: &'static dyn OutputFormatter = &ExtJsFormatter(); - -struct ExtJsFormatter(); - -impl OutputFormatter for ExtJsFormatter { - fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response { - let mut result = json!({ - "data": data, - "success": true - }); - - add_result_attributes(&mut result, rpcenv); - - json_data_response(result) - } - - fn format_data_streaming( - &self, - data: Box, - rpcenv: &dyn RpcEnvironment, - ) -> Result, Error> { - let mut value = json!({ - "success": true, - }); - - add_result_attributes(&mut value, rpcenv); - - let reader = start_data_streaming(value, data); - let stream = tokio_stream::wrappers::ReceiverStream::new(reader); - - json_data_response_streaming(Body::wrap_stream(stream)) - } - - fn format_error(&self, err: Error) -> Response { - let mut errors = HashMap::new(); - - let message: String = match err.downcast::() { - Ok(param_err) => { - for (name, err) in param_err { - errors.insert(name, err.to_string()); - } - String::from("parameter verification errors") - } - Err(err) => err.to_string(), - }; - - let result = json!({ - "message": message, - "errors": errors, - "success": false - }); - - let mut response = json_data_response(result); - - response - .extensions_mut() - .insert(ErrorMessageExtension(message)); - - response - } -} diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs deleted file mode 100644 index 3f90c178..00000000 --- a/proxmox-rest-server/src/h2service.rs +++ /dev/null @@ -1,147 +0,0 @@ -use anyhow::Error; - -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use futures::*; -use hyper::{Body, Request, Response, StatusCode}; - -use proxmox_router::http_err; -use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment}; - -use crate::formatter::*; -use crate::{normalize_uri_path, WorkerTask}; - -/// Hyper Service implementation to handle stateful H2 connections. -/// -/// We use this kind of service to handle backup protocol -/// connections. State is stored inside the generic ``rpcenv``. Logs -/// goes into the ``WorkerTask`` log. -pub struct H2Service { - router: &'static Router, - rpcenv: E, - worker: Arc, - debug: bool, -} - -impl H2Service { - pub fn new(rpcenv: E, worker: Arc, router: &'static Router, debug: bool) -> Self { - Self { - rpcenv, - worker, - router, - debug, - } - } - - pub fn debug>(&self, msg: S) { - if self.debug { - self.worker.log_message(msg); - } - } - - fn handle_request(&self, req: Request) -> ApiResponseFuture { - let (parts, body) = req.into_parts(); - - let method = parts.method.clone(); - - let (path, components) = match normalize_uri_path(parts.uri.path()) { - Ok((p, c)) => (p, c), - Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(), - }; - - self.debug(format!("{} {}", method, path)); - - let mut uri_param = HashMap::new(); - - let formatter = JSON_FORMATTER; - - match self.router.find_method(&components, method, &mut uri_param) { - None => { - let err = http_err!(NOT_FOUND, "Path '{}' not found.", path); - future::ok(formatter.format_error(err)).boxed() - } - Some(api_method) => crate::rest::handle_api_request( - self.rpcenv.clone(), - api_method, - formatter, - parts, - body, - uri_param, - ) - .boxed(), - } - } - - fn log_response( - worker: Arc, - method: hyper::Method, - path: &str, - resp: &Response, - ) { - let status = resp.status(); - - if !status.is_success() { - let reason = status.canonical_reason().unwrap_or("unknown reason"); - - let mut message = "request failed"; - if let Some(data) = resp.extensions().get::() { - message = &data.0; - } - - worker.log_message(format!( - "{} {}: {} {}: {}", - method.as_str(), - path, - status.as_str(), - reason, - message - )); - } - } -} - -impl tower_service::Service> for H2Service { - type Response = Response; - type Error = Error; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let path = req.uri().path().to_owned(); - let method = req.method().clone(); - let worker = self.worker.clone(); - - self.handle_request(req) - .map(move |result| match result { - Ok(res) => { - Self::log_response(worker, method, &path, &res); - Ok::<_, Error>(res) - } - Err(err) => { - if let Some(apierr) = err.downcast_ref::() { - let mut resp = Response::new(Body::from(apierr.message.clone())); - resp.extensions_mut() - .insert(ErrorMessageExtension(apierr.message.clone())); - *resp.status_mut() = apierr.code; - Self::log_response(worker, method, &path, &resp); - Ok(resp) - } else { - let mut resp = Response::new(Body::from(err.to_string())); - resp.extensions_mut() - .insert(ErrorMessageExtension(err.to_string())); - *resp.status_mut() = StatusCode::BAD_REQUEST; - Self::log_response(worker, method, &path, &resp); - Ok(resp) - } - } - }) - .boxed() - } -} diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs deleted file mode 100644 index b8a73e35..00000000 --- a/proxmox-rest-server/src/lib.rs +++ /dev/null @@ -1,244 +0,0 @@ -//! # Proxmox REST server -//! -//! This module provides convenient building blocks to implement a -//! REST server. -//! -//! ## Features -//! -//! * highly threaded code, uses Rust async -//! * static API definitions using schemas -//! * restartable systemd daemons using `systemd_notify` -//! * support for long running worker tasks (threads or async tokio tasks) -//! * supports separate access and authentication log files -//! * extra control socket to trigger management operations -//! - logfile rotation -//! - worker task management -//! * generic interface to authenticate user - -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; - -use anyhow::{bail, format_err, Error}; -use http::request::Parts; -use http::HeaderMap; -use hyper::{Body, Method, Response}; -use nix::unistd::Pid; - -use proxmox_router::UserInformation; -use proxmox_sys::fd::Fd; -use proxmox_sys::fs::CreateOptions; -use proxmox_sys::linux::procfs::PidStat; - -mod compression; -pub use compression::*; - -pub mod daemon; - -pub mod formatter; - -mod environment; -pub use environment::*; - -mod state; -pub use state::*; - -mod command_socket; -pub use command_socket::*; - -mod file_logger; -pub use file_logger::{FileLogOptions, FileLogger}; - -mod api_config; -pub use api_config::ApiConfig; - -mod rest; -pub use rest::RestServer; - -mod worker_task; -pub use worker_task::*; - -mod h2service; -pub use h2service::*; - -/// Authentication Error -pub enum AuthError { - Generic(Error), - NoData, -} - -impl From for AuthError { - fn from(err: Error) -> Self { - AuthError::Generic(err) - } -} - -/// Result of [`ServerAdapter::check_auth`]. -pub type ServerAdapterCheckAuth<'a> = Pin< - Box< - dyn Future), AuthError>> - + Send - + 'a, - >, ->; - -/// User Authentication and index/root page generation methods -pub trait ServerAdapter: Send + Sync { - /// Returns the index/root page - fn get_index( - &self, - rest_env: RestEnvironment, - parts: Parts, - ) -> Pin> + Send>>; - - /// Extract user credentials from headers and check them. - /// - /// If credenthials are valid, returns the username and a - /// [UserInformation] object to query additional user data. - fn check_auth<'a>( - &'a self, - headers: &'a HeaderMap, - method: &'a Method, - ) -> ServerAdapterCheckAuth<'a>; -} - -lazy_static::lazy_static! { - static ref PID: i32 = unsafe { libc::getpid() }; - static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; -} - -/// Returns the current process ID (see [libc::getpid]) -/// -/// The value is cached at startup (so it is invalid after a fork) -pub(crate) fn pid() -> i32 { - *PID -} - -/// Returns the starttime of the process (see [PidStat]) -/// -/// The value is cached at startup (so it is invalid after a fork) -pub(crate) fn pstart() -> u64 { - *PSTART -} - -/// Helper to write the PID into a file -pub fn write_pid(pid_fn: &str) -> Result<(), Error> { - let pid_str = format!("{}\n", *PID); - proxmox_sys::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new(), false) -} - -/// Helper to read the PID from a file -pub fn read_pid(pid_fn: &str) -> Result { - let pid = proxmox_sys::fs::file_get_contents(pid_fn)?; - let pid = std::str::from_utf8(&pid)?.trim(); - pid.parse() - .map_err(|err| format_err!("could not parse pid - {}", err)) -} - -/// Returns the control socket path for a specific process ID. -/// -/// Note: The control socket always uses @/run/proxmox-backup/ as -/// prefix for historic reason. This does not matter because the -/// generated path is unique for each ``pid`` anyways. -pub fn ctrl_sock_from_pid(pid: i32) -> String { - // Note: The control socket always uses @/run/proxmox-backup/ as prefix - // for historc reason. - format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid) -} - -/// Returns the control socket path for this server. -pub fn our_ctrl_sock() -> String { - ctrl_sock_from_pid(*PID) -} - -static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); - -/// Request a server shutdown (usually called from [catch_shutdown_signal]) -pub fn request_shutdown() { - SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst); - crate::server_shutdown(); -} - -/// Returns true if there was a shutdown request. -#[inline(always)] -pub fn shutdown_requested() -> bool { - SHUTDOWN_REQUESTED.load(Ordering::SeqCst) -} - -/// Raise an error if there was a shutdown request. -pub fn fail_on_shutdown() -> Result<(), Error> { - if shutdown_requested() { - bail!("Server shutdown requested - aborting task"); - } - Ok(()) -} - -/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file -/// descriptors. -pub fn socketpair() -> Result<(Fd, Fd), Error> { - use nix::sys::socket; - let (pa, pb) = socket::socketpair( - socket::AddressFamily::Unix, - socket::SockType::Stream, - None, - socket::SockFlag::SOCK_CLOEXEC, - )?; - Ok((Fd(pa), Fd(pb))) -} - -/// Extract a specific cookie from cookie header. -/// We assume cookie_name is already url encoded. -pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option { - for pair in cookie.split(';') { - let (name, value) = match pair.find('=') { - Some(i) => (pair[..i].trim(), pair[(i + 1)..].trim()), - None => return None, // Cookie format error - }; - - if name == cookie_name { - use percent_encoding::percent_decode; - if let Ok(value) = percent_decode(value.as_bytes()).decode_utf8() { - return Some(value.into()); - } else { - return None; // Cookie format error - } - } - } - - None -} - -/// Extract a specific cookie from a HeaderMap's "COOKIE" entry. -/// We assume cookie_name is already url encoded. -pub fn cookie_from_header(headers: &http::HeaderMap, cookie_name: &str) -> Option { - if let Some(Ok(cookie)) = headers.get("COOKIE").map(|v| v.to_str()) { - extract_cookie(cookie, cookie_name) - } else { - None - } -} - -/// normalize uri path -/// -/// Do not allow ".", "..", or hidden files ".XXXX" -/// Also remove empty path components -pub fn normalize_uri_path(path: &str) -> Result<(String, Vec<&str>), Error> { - let items = path.split('/'); - - let mut path = String::new(); - let mut components = vec![]; - - for name in items { - if name.is_empty() { - continue; - } - if name.starts_with('.') { - bail!("Path contains illegal components."); - } - path.push('/'); - path.push_str(name); - components.push(name); - } - - Ok((path, components)) -} diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs deleted file mode 100644 index 96c35f09..00000000 --- a/proxmox-rest-server/src/rest.rs +++ /dev/null @@ -1,789 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::hash::BuildHasher; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; - -use anyhow::{bail, format_err, Error}; -use futures::future::{self, FutureExt, TryFutureExt}; -use futures::stream::TryStreamExt; -use hyper::body::HttpBody; -use hyper::header::{self, HeaderMap}; -use hyper::http::request::Parts; -use hyper::{Body, Request, Response, StatusCode}; -use lazy_static::lazy_static; -use regex::Regex; -use serde_json::Value; -use tokio::fs::File; -use tokio::time::Instant; -use tower_service::Service; -use url::form_urlencoded; - -use proxmox_router::http_err; -use proxmox_router::{ - check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, - RpcEnvironmentType, UserInformation, -}; -use proxmox_schema::{ObjectSchemaType, ParameterSchema}; - -use proxmox_http::client::RateLimitedStream; - -use proxmox_async::stream::AsyncReaderStream; -use proxmox_compression::{DeflateEncoder, Level}; - -use crate::{ - formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger, - RestEnvironment, -}; - -extern "C" { - fn tzset(); -} - -struct AuthStringExtension(String); - -struct EmptyUserInformation {} - -impl UserInformation for EmptyUserInformation { - fn is_superuser(&self, _userid: &str) -> bool { - false - } - fn is_group_member(&self, _userid: &str, _group: &str) -> bool { - false - } - fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { - 0 - } -} - -/// REST server implementation (configured with [ApiConfig]) -/// -/// This struct implements the [Service] trait in order to use it with -/// [hyper::server::Builder::serve]. -pub struct RestServer { - api_config: Arc, -} - -const MAX_URI_QUERY_LENGTH: usize = 3072; -const CHUNK_SIZE_LIMIT: u64 = 32 * 1024; - -impl RestServer { - /// Creates a new instance. - pub fn new(api_config: ApiConfig) -> Self { - Self { - api_config: Arc::new(api_config), - } - } -} - -impl Service<&Pin>>>> - for RestServer -{ - type Response = ApiService; - type Error = Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call( - &mut self, - ctx: &Pin>>>, - ) -> Self::Future { - match ctx.get_ref().peer_addr() { - Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(), - Ok(peer) => future::ok(ApiService { - peer, - api_config: self.api_config.clone(), - }) - .boxed(), - } - } -} - -impl Service<&Pin>>> for RestServer { - type Response = ApiService; - type Error = Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call( - &mut self, - ctx: &Pin>>, - ) -> Self::Future { - match ctx.get_ref().peer_addr() { - Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(), - Ok(peer) => future::ok(ApiService { - peer, - api_config: self.api_config.clone(), - }) - .boxed(), - } - } -} - -impl Service<&hyper::server::conn::AddrStream> for RestServer { - type Response = ApiService; - type Error = Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future { - let peer = ctx.remote_addr(); - future::ok(ApiService { - peer, - api_config: self.api_config.clone(), - }) - .boxed() - } -} - -impl Service<&tokio::net::UnixStream> for RestServer { - type Response = ApiService; - type Error = Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _ctx: &tokio::net::UnixStream) -> Self::Future { - // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now - // it doesn't really matter, so just use a fake IP address - let fake_peer = "0.0.0.0:807".parse().unwrap(); - future::ok(ApiService { - peer: fake_peer, - api_config: self.api_config.clone(), - }) - .boxed() - } -} - -// Helper [Service] containing the peer Address -// -// The lower level connection [Service] implementation on -// [RestServer] extracts the peer address and return an [ApiService]. -// -// Rust wants this type 'pub' here (else we get 'private type `ApiService` -// in public interface'). The type is still private because the crate does -// not export it. -pub struct ApiService { - pub peer: std::net::SocketAddr, - pub api_config: Arc, -} - -fn log_response( - logfile: Option<&Arc>>, - peer: &std::net::SocketAddr, - method: hyper::Method, - path_query: &str, - resp: &Response, - user_agent: Option, -) { - if resp.extensions().get::().is_some() { - return; - }; - - // we also log URL-to-long requests, so avoid message bigger than PIPE_BUF (4k on Linux) - // to profit from atomicty guarantees for O_APPEND opened logfiles - let path = &path_query[..MAX_URI_QUERY_LENGTH.min(path_query.len())]; - - let status = resp.status(); - if !(status.is_success() || status.is_informational()) { - let reason = status.canonical_reason().unwrap_or("unknown reason"); - - let message = match resp.extensions().get::() { - Some(data) => &data.0, - None => "request failed", - }; - - log::error!( - "{} {}: {} {}: [client {}] {}", - method.as_str(), - path, - status.as_str(), - reason, - peer, - message - ); - } - if let Some(logfile) = logfile { - let auth_id = match resp.extensions().get::() { - Some(AuthStringExtension(auth_id)) => auth_id.clone(), - None => "-".to_string(), - }; - let now = proxmox_time::epoch_i64(); - // time format which apache/nginx use (by default), copied from pve-http-server - let datetime = proxmox_time::strftime_local("%d/%m/%Y:%H:%M:%S %z", now) - .unwrap_or_else(|_| "-".to_string()); - - logfile.lock().unwrap().log(format!( - "{} - {} [{}] \"{} {}\" {} {} {}", - peer.ip(), - auth_id, - datetime, - method.as_str(), - path, - status.as_str(), - resp.body().size_hint().lower(), - user_agent.unwrap_or_else(|| "-".to_string()), - )); - } -} - -fn get_proxied_peer(headers: &HeaderMap) -> Option { - lazy_static! { - static ref RE: Regex = Regex::new(r#"for="([^"]+)""#).unwrap(); - } - let forwarded = headers.get(header::FORWARDED)?.to_str().ok()?; - let capture = RE.captures(forwarded)?; - let rhost = capture.get(1)?.as_str(); - - rhost.parse().ok() -} - -fn get_user_agent(headers: &HeaderMap) -> Option { - let agent = headers.get(header::USER_AGENT)?.to_str(); - agent - .map(|s| { - let mut s = s.to_owned(); - s.truncate(128); - s - }) - .ok() -} - -impl Service> for ApiService { - type Response = Response; - type Error = Error; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let path = req.uri().path_and_query().unwrap().as_str().to_owned(); - let method = req.method().clone(); - let user_agent = get_user_agent(req.headers()); - - let config = Arc::clone(&self.api_config); - let peer = match get_proxied_peer(req.headers()) { - Some(proxied_peer) => proxied_peer, - None => self.peer, - }; - async move { - let response = match handle_request(Arc::clone(&config), req, &peer).await { - Ok(response) => response, - Err(err) => { - let (err, code) = match err.downcast_ref::() { - Some(apierr) => (apierr.message.clone(), apierr.code), - _ => (err.to_string(), StatusCode::BAD_REQUEST), - }; - Response::builder() - .status(code) - .extension(ErrorMessageExtension(err.to_string())) - .body(err.into())? - } - }; - let logger = config.get_access_log(); - log_response(logger, &peer, method, &path, &response, user_agent); - Ok(response) - } - .boxed() - } -} - -fn parse_query_parameters( - param_schema: ParameterSchema, - form: &str, // x-www-form-urlencoded body data - parts: &Parts, - uri_param: &HashMap, -) -> Result { - let mut param_list: Vec<(String, String)> = vec![]; - - if !form.is_empty() { - for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() { - param_list.push((k, v)); - } - } - - if let Some(query_str) = parts.uri.query() { - for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { - if k == "_dc" { - continue; - } // skip extjs "disable cache" parameter - param_list.push((k, v)); - } - } - - for (k, v) in uri_param { - param_list.push((k.clone(), v.clone())); - } - - let params = param_schema.parse_parameter_strings(¶m_list, true)?; - - Ok(params) -} - -async fn get_request_parameters( - param_schema: ParameterSchema, - parts: Parts, - req_body: Body, - uri_param: HashMap, -) -> Result { - let mut is_json = false; - - if let Some(value) = parts.headers.get(header::CONTENT_TYPE) { - match value.to_str().map(|v| v.split(';').next()) { - Ok(Some("application/x-www-form-urlencoded")) => { - is_json = false; - } - Ok(Some("application/json")) => { - is_json = true; - } - _ => bail!("unsupported content type {:?}", value.to_str()), - } - } - - let body = TryStreamExt::map_err(req_body, |err| { - http_err!(BAD_REQUEST, "Problems reading request body: {}", err) - }) - .try_fold(Vec::new(), |mut acc, chunk| async move { - // FIXME: max request body size? - if acc.len() + chunk.len() < 64 * 1024 { - acc.extend_from_slice(&chunk); - Ok(acc) - } else { - Err(http_err!(BAD_REQUEST, "Request body too large")) - } - }) - .await?; - - let utf8_data = - std::str::from_utf8(&body).map_err(|err| format_err!("Request body not uft8: {}", err))?; - - if is_json { - let mut params: Value = serde_json::from_str(utf8_data)?; - for (k, v) in uri_param { - if let Some((_optional, prop_schema)) = param_schema.lookup(&k) { - params[&k] = prop_schema.parse_simple_value(&v)?; - } - } - param_schema.verify_json(¶ms)?; - Ok(params) - } else { - parse_query_parameters(param_schema, utf8_data, &parts, &uri_param) - } -} - -struct NoLogExtension(); - -async fn proxy_protected_request( - info: &'static ApiMethod, - mut parts: Parts, - req_body: Body, - peer: &std::net::SocketAddr, -) -> Result, Error> { - let mut uri_parts = parts.uri.clone().into_parts(); - - uri_parts.scheme = Some(http::uri::Scheme::HTTP); - uri_parts.authority = Some(http::uri::Authority::from_static("127.0.0.1:82")); - let new_uri = http::Uri::from_parts(uri_parts).unwrap(); - - parts.uri = new_uri; - - let mut request = Request::from_parts(parts, req_body); - request.headers_mut().insert( - header::FORWARDED, - format!("for=\"{}\";", peer).parse().unwrap(), - ); - - let reload_timezone = info.reload_timezone; - - let resp = hyper::client::Client::new() - .request(request) - .map_err(Error::from) - .map_ok(|mut resp| { - resp.extensions_mut().insert(NoLogExtension()); - resp - }) - .await?; - - if reload_timezone { - unsafe { - tzset(); - } - } - - Ok(resp) -} - -pub(crate) async fn handle_api_request( - mut rpcenv: Env, - info: &'static ApiMethod, - formatter: &'static dyn OutputFormatter, - parts: Parts, - req_body: Body, - uri_param: HashMap, -) -> Result, Error> { - let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); - let compression = extract_compression_method(&parts.headers); - - let result = match info.handler { - ApiHandler::AsyncHttp(handler) => { - let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; - (handler)(parts, req_body, params, info, Box::new(rpcenv)).await - } - ApiHandler::StreamingSync(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv) - .and_then(|data| formatter.format_data_streaming(data, &rpcenv)) - } - ApiHandler::StreamingAsync(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv) - .await - .and_then(|data| formatter.format_data_streaming(data, &rpcenv)) - } - ApiHandler::Sync(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv).map(|data| formatter.format_data(data, &rpcenv)) - } - ApiHandler::Async(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv) - .await - .map(|data| formatter.format_data(data, &rpcenv)) - } - _ => { - bail!("Unknown API handler type"); - } - }; - - let mut resp = match result { - Ok(resp) => resp, - Err(err) => { - if let Some(httperr) = err.downcast_ref::() { - if httperr.code == StatusCode::UNAUTHORIZED { - tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; - } - } - formatter.format_error(err) - } - }; - - let resp = match compression { - Some(CompressionMethod::Deflate) => { - resp.headers_mut().insert( - header::CONTENT_ENCODING, - CompressionMethod::Deflate.content_encoding(), - ); - resp.map(|body| { - Body::wrap_stream(DeflateEncoder::with_quality( - TryStreamExt::map_err(body, |err| { - proxmox_lang::io_format_err!("error during compression: {}", err) - }), - Level::Default, - )) - }) - } - None => resp, - }; - - if info.reload_timezone { - unsafe { - tzset(); - } - } - - Ok(resp) -} - -fn extension_to_content_type(filename: &Path) -> (&'static str, bool) { - if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) { - return match ext { - "css" => ("text/css", false), - "html" => ("text/html", false), - "js" => ("application/javascript", false), - "json" => ("application/json", false), - "map" => ("application/json", false), - "png" => ("image/png", true), - "ico" => ("image/x-icon", true), - "gif" => ("image/gif", true), - "svg" => ("image/svg+xml", false), - "jar" => ("application/java-archive", true), - "woff" => ("application/font-woff", true), - "woff2" => ("application/font-woff2", true), - "ttf" => ("application/font-snft", true), - "pdf" => ("application/pdf", true), - "epub" => ("application/epub+zip", true), - "mp3" => ("audio/mpeg", true), - "oga" => ("audio/ogg", true), - "tgz" => ("application/x-compressed-tar", true), - _ => ("application/octet-stream", false), - }; - } - - ("application/octet-stream", false) -} - -async fn simple_static_file_download( - filename: PathBuf, - content_type: &'static str, - compression: Option, -) -> Result, Error> { - use tokio::io::AsyncReadExt; - - let mut file = File::open(filename) - .await - .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; - - let mut data: Vec = Vec::new(); - - let mut response = match compression { - Some(CompressionMethod::Deflate) => { - let mut enc = DeflateEncoder::with_quality(data, Level::Default); - enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize) - .await?; - let mut response = Response::new(enc.into_inner().into()); - response.headers_mut().insert( - header::CONTENT_ENCODING, - CompressionMethod::Deflate.content_encoding(), - ); - response - } - None => { - file.read_to_end(&mut data) - .await - .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?; - Response::new(data.into()) - } - }; - - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static(content_type), - ); - - Ok(response) -} - -async fn chuncked_static_file_download( - filename: PathBuf, - content_type: &'static str, - compression: Option, -) -> Result, Error> { - let mut resp = Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, content_type); - - let file = File::open(filename) - .await - .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; - - let body = match compression { - Some(CompressionMethod::Deflate) => { - resp = resp.header( - header::CONTENT_ENCODING, - CompressionMethod::Deflate.content_encoding(), - ); - Body::wrap_stream(DeflateEncoder::with_quality( - AsyncReaderStream::new(file), - Level::Default, - )) - } - None => Body::wrap_stream(AsyncReaderStream::new(file)), - }; - - Ok(resp.body(body).unwrap()) -} - -async fn handle_static_file_download( - filename: PathBuf, - compression: Option, -) -> Result, Error> { - let metadata = tokio::fs::metadata(filename.clone()) - .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err)) - .await?; - - let (content_type, nocomp) = extension_to_content_type(&filename); - let compression = if nocomp { None } else { compression }; - - if metadata.len() < CHUNK_SIZE_LIMIT { - simple_static_file_download(filename, content_type, compression).await - } else { - chuncked_static_file_download(filename, content_type, compression).await - } -} - -// FIXME: support handling multiple compression methods -fn extract_compression_method(headers: &http::HeaderMap) -> Option { - if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) { - for encoding in encodings.split(&[',', ' '][..]) { - if let Ok(method) = encoding.parse() { - return Some(method); - } - } - } - None -} - -async fn handle_request( - api: Arc, - req: Request, - peer: &std::net::SocketAddr, -) -> Result, Error> { - let (parts, body) = req.into_parts(); - let method = parts.method.clone(); - let (path, components) = normalize_uri_path(parts.uri.path())?; - - let comp_len = components.len(); - - let query = parts.uri.query().unwrap_or_default(); - if path.len() + query.len() > MAX_URI_QUERY_LENGTH { - return Ok(Response::builder() - .status(StatusCode::URI_TOO_LONG) - .body("".into()) - .unwrap()); - } - - let env_type = api.env_type(); - let mut rpcenv = RestEnvironment::new(env_type, Arc::clone(&api)); - - rpcenv.set_client_ip(Some(*peer)); - - let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); - let access_forbidden_time = std::time::Instant::now() + std::time::Duration::from_millis(500); - - if comp_len >= 1 && components[0] == "api2" { - if comp_len >= 2 { - let format = components[1]; - - let formatter: &dyn OutputFormatter = match format { - "json" => JSON_FORMATTER, - "extjs" => EXTJS_FORMATTER, - _ => bail!("Unsupported output format '{}'.", format), - }; - - let mut uri_param = HashMap::new(); - let api_method = api.find_method(&components[2..], method.clone(), &mut uri_param); - - let mut auth_required = true; - if let Some(api_method) = api_method { - if let Permission::World = *api_method.access.permission { - auth_required = false; // no auth for endpoints with World permission - } - } - - let mut user_info: Box = - Box::new(EmptyUserInformation {}); - - if auth_required { - match api.check_auth(&parts.headers, &method).await { - Ok((authid, info)) => { - rpcenv.set_auth_id(Some(authid)); - user_info = info; - } - Err(auth_err) => { - let err = match auth_err { - AuthError::Generic(err) => err, - AuthError::NoData => { - format_err!("no authentication credentials provided.") - } - }; - // fixme: log Username?? - rpcenv.log_failed_auth(None, &err.to_string()); - - // always delay unauthorized calls by 3 seconds (from start of request) - let err = http_err!(UNAUTHORIZED, "authentication failed - {}", err); - tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; - return Ok(formatter.format_error(err)); - } - } - } - - match api_method { - None => { - let err = http_err!(NOT_FOUND, "Path '{}' not found.", path); - return Ok(formatter.format_error(err)); - } - Some(api_method) => { - let auth_id = rpcenv.get_auth_id(); - let user_info = user_info; - - if !check_api_permission( - api_method.access.permission, - auth_id.as_deref(), - &uri_param, - user_info.as_ref(), - ) { - let err = http_err!(FORBIDDEN, "permission check failed"); - tokio::time::sleep_until(Instant::from_std(access_forbidden_time)).await; - return Ok(formatter.format_error(err)); - } - - let result = if api_method.protected && env_type == RpcEnvironmentType::PUBLIC { - proxy_protected_request(api_method, parts, body, peer).await - } else { - handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param) - .await - }; - - let mut response = match result { - Ok(resp) => resp, - Err(err) => formatter.format_error(err), - }; - - if let Some(auth_id) = auth_id { - response - .extensions_mut() - .insert(AuthStringExtension(auth_id)); - } - - return Ok(response); - } - } - } - } else { - // not Auth required for accessing files! - - if method != hyper::Method::GET { - bail!("Unsupported HTTP method {}", method); - } - - if comp_len == 0 { - match api.check_auth(&parts.headers, &method).await { - Ok((auth_id, _user_info)) => { - rpcenv.set_auth_id(Some(auth_id)); - return Ok(api.get_index(rpcenv, parts).await); - } - Err(AuthError::Generic(_)) => { - tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await; - } - Err(AuthError::NoData) => {} - } - return Ok(api.get_index(rpcenv, parts).await); - } else { - let filename = api.find_alias(&components); - let compression = extract_compression_method(&parts.headers); - return handle_static_file_download(filename, compression).await; - } - } - - Err(http_err!(NOT_FOUND, "Path '{}' not found.", path)) -} diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs deleted file mode 100644 index c3b81627..00000000 --- a/proxmox-rest-server/src/state.rs +++ /dev/null @@ -1,161 +0,0 @@ -use anyhow::Error; -use lazy_static::lazy_static; -use std::sync::Mutex; - -use futures::*; - -use tokio::signal::unix::{signal, SignalKind}; - -use proxmox_async::broadcast_future::BroadcastData; - -use crate::request_shutdown; - -#[derive(PartialEq, Copy, Clone, Debug)] -enum ServerMode { - Normal, - Shutdown, -} - -struct ServerState { - mode: ServerMode, - shutdown_listeners: BroadcastData<()>, - last_worker_listeners: BroadcastData<()>, - worker_count: usize, - internal_task_count: usize, - reload_request: bool, -} - -lazy_static! { - static ref SERVER_STATE: Mutex = Mutex::new(ServerState { - mode: ServerMode::Normal, - shutdown_listeners: BroadcastData::new(), - last_worker_listeners: BroadcastData::new(), - worker_count: 0, - internal_task_count: 0, - reload_request: false, - }); -} - -/// Listen to ``SIGINT`` for server shutdown -/// -/// This calls [request_shutdown] when receiving the signal. -pub fn catch_shutdown_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::interrupt())?; - - let future = async move { - while stream.recv().await.is_some() { - log::info!("got shutdown request (SIGINT)"); - SERVER_STATE.lock().unwrap().reload_request = false; - request_shutdown(); - } - } - .boxed(); - - let abort_future = last_worker_future().map_err(|_| {}); - let task = futures::future::select(future, abort_future); - - tokio::spawn(task.map(|_| ())); - - Ok(()) -} - -/// Listen to ``SIGHUP`` for server reload -/// -/// This calls [request_shutdown] when receiving the signal, and tries -/// to restart the server. -pub fn catch_reload_signal() -> Result<(), Error> { - let mut stream = signal(SignalKind::hangup())?; - - let future = async move { - while stream.recv().await.is_some() { - log::info!("got reload request (SIGHUP)"); - SERVER_STATE.lock().unwrap().reload_request = true; - crate::request_shutdown(); - } - } - .boxed(); - - let abort_future = last_worker_future().map_err(|_| {}); - let task = futures::future::select(future, abort_future); - - tokio::spawn(task.map(|_| ())); - - Ok(()) -} - -pub(crate) fn is_reload_request() -> bool { - let data = SERVER_STATE.lock().unwrap(); - - data.mode == ServerMode::Shutdown && data.reload_request -} - -pub(crate) fn server_shutdown() { - let mut data = SERVER_STATE.lock().unwrap(); - - log::info!("request_shutdown"); - - data.mode = ServerMode::Shutdown; - - data.shutdown_listeners.notify_listeners(Ok(())); - - drop(data); // unlock - - check_last_worker(); -} - -/// Future to signal server shutdown -pub fn shutdown_future() -> impl Future { - let mut data = SERVER_STATE.lock().unwrap(); - data.shutdown_listeners.listen().map(|_| ()) -} - -/// Future to signal when last worker task finished -pub fn last_worker_future() -> impl Future> { - let mut data = SERVER_STATE.lock().unwrap(); - data.last_worker_listeners.listen() -} - -pub(crate) fn set_worker_count(count: usize) { - SERVER_STATE.lock().unwrap().worker_count = count; - - check_last_worker(); -} - -pub(crate) fn check_last_worker() { - let mut data = SERVER_STATE.lock().unwrap(); - - if !(data.mode == ServerMode::Shutdown - && data.worker_count == 0 - && data.internal_task_count == 0) - { - return; - } - - data.last_worker_listeners.notify_listeners(Ok(())); -} - -/// Spawns a tokio task that will be tracked for reload -/// and if it is finished, notify the [last_worker_future] if we -/// are in shutdown mode. -pub fn spawn_internal_task(task: T) -where - T: Future + Send + 'static, - T::Output: Send + 'static, -{ - let mut data = SERVER_STATE.lock().unwrap(); - data.internal_task_count += 1; - - tokio::spawn(async move { - let _ = tokio::spawn(task).await; // ignore errors - - { - // drop mutex - let mut data = SERVER_STATE.lock().unwrap(); - if data.internal_task_count > 0 { - data.internal_task_count -= 1; - } - } - - check_last_worker(); - }); -} diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs deleted file mode 100644 index 44d81119..00000000 --- a/proxmox-rest-server/src/worker_task.rs +++ /dev/null @@ -1,1058 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::fs::File; -use std::io::{BufRead, BufReader, Read, Write}; -use std::panic::UnwindSafe; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; - -use anyhow::{bail, format_err, Error}; -use futures::*; -use lazy_static::lazy_static; -use nix::fcntl::OFlag; -use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use tokio::sync::oneshot; - -use proxmox_lang::try_block; -use proxmox_schema::upid::UPID; -use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions}; -use proxmox_sys::linux::procfs; -use proxmox_sys::task_warn; - -use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; -use proxmox_sys::WorkerTaskContext; - -use crate::{CommandSocket, FileLogOptions, FileLogger}; - -struct TaskListLockGuard(File); - -struct WorkerTaskSetup { - file_opts: CreateOptions, - taskdir: PathBuf, - task_lock_fn: PathBuf, - active_tasks_fn: PathBuf, - task_index_fn: PathBuf, - task_archive_fn: PathBuf, -} - -static WORKER_TASK_SETUP: OnceCell = OnceCell::new(); - -fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { - WORKER_TASK_SETUP - .get() - .ok_or_else(|| format_err!("WorkerTask library is not initialized")) -} - -impl WorkerTaskSetup { - fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { - let mut taskdir = basedir; - taskdir.push("tasks"); - - let mut task_lock_fn = taskdir.clone(); - task_lock_fn.push(".active.lock"); - - let mut active_tasks_fn = taskdir.clone(); - active_tasks_fn.push("active"); - - let mut task_index_fn = taskdir.clone(); - task_index_fn.push("index"); - - let mut task_archive_fn = taskdir.clone(); - task_archive_fn.push("archive"); - - Self { - file_opts, - taskdir, - task_lock_fn, - active_tasks_fn, - task_index_fn, - task_archive_fn, - } - } - - fn lock_task_list_files(&self, exclusive: bool) -> Result { - let options = self - .file_opts - .clone() - .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); - - let timeout = std::time::Duration::new(10, 0); - - let file = - proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?; - - Ok(TaskListLockGuard(file)) - } - - fn log_path(&self, upid: &UPID) -> std::path::PathBuf { - let mut path = self.taskdir.clone(); - path.push(format!("{:02X}", upid.pstart % 256)); - path.push(upid.to_string()); - path - } - - // atomically read/update the task list, update status of finished tasks - // new_upid is added to the list when specified. - fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { - let lock = self.lock_task_list_files(true)?; - - // TODO remove with 1.x - let mut finish_list: Vec = read_task_file_from_path(&self.task_index_fn)?; - let had_index_file = !finish_list.is_empty(); - - // We use filter_map because one negative case wants to *move* the data into `finish_list`, - // clippy doesn't quite catch this! - #[allow(clippy::unnecessary_filter_map)] - let mut active_list: Vec = read_task_file_from_path(&self.active_tasks_fn)? - .into_iter() - .filter_map(|info| { - if info.state.is_some() { - // this can happen when the active file still includes finished tasks - finish_list.push(info); - return None; - } - - if !worker_is_active_local(&info.upid) { - // println!("Detected stopped task '{}'", &info.upid_str); - let now = proxmox_time::epoch_i64(); - let status = - upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); - finish_list.push(TaskListInfo { - upid: info.upid, - upid_str: info.upid_str, - state: Some(status), - }); - return None; - } - - Some(info) - }) - .collect(); - - if let Some(upid) = new_upid { - active_list.push(TaskListInfo { - upid: upid.clone(), - upid_str: upid.to_string(), - state: None, - }); - } - - let active_raw = render_task_list(&active_list); - - let options = self - .file_opts - .clone() - .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); - - replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?; - - finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) { - (Some(s1), Some(s2)) => s1.cmp(s2), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - _ => a.upid.starttime.cmp(&b.upid.starttime), - }); - - if !finish_list.is_empty() { - let options = self - .file_opts - .clone() - .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); - - let mut writer = atomic_open_or_create_file( - &self.task_archive_fn, - OFlag::O_APPEND | OFlag::O_RDWR, - &[], - options, - false, - )?; - for info in &finish_list { - writer.write_all(render_task_line(info).as_bytes())?; - } - } - - // TODO Remove with 1.x - // for compatibility, if we had an INDEX file, we do not need it anymore - if had_index_file { - let _ = nix::unistd::unlink(&self.task_index_fn); - } - - drop(lock); - - Ok(()) - } - - // Create task log directory with correct permissions - fn create_task_log_dirs(&self) -> Result<(), Error> { - try_block!({ - let dir_opts = self - .file_opts - .clone() - .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); - - create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?; - // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; - Ok(()) - }) - .map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) - } -} - -/// Initialize the WorkerTask library -pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { - let setup = WorkerTaskSetup::new(basedir, file_opts); - setup.create_task_log_dirs()?; - WORKER_TASK_SETUP - .set(setup) - .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) -} - -/// Optionally rotates and/or cleans up the task archive depending on its size and age. -/// -/// Check if the Task Archive is bigger than 'size_threshold' bytes, and rotate in that case. -/// Keeps either only up to 'max_files' if 'max_days' is not given. Else, 'max_files' will be -/// ignored, and all archive files older than the first with only tasks from before 'now-max_days' -/// will be deleted -pub fn rotate_task_log_archive( - size_threshold: u64, - compress: bool, - max_files: Option, - max_days: Option, - options: Option, -) -> Result { - let setup = worker_task_setup()?; - - let _lock = setup.lock_task_list_files(true)?; - - let mut logrotate = LogRotate::new( - &setup.task_archive_fn, - compress, - if max_days.is_none() { max_files } else { None }, - options, - )?; - - let mut rotated = logrotate.rotate(size_threshold)?; - - if let Some(max_days) = max_days { - // NOTE: not on exact day-boundary but close enough for what's done here - let cutoff_time = proxmox_time::epoch_i64() - (max_days * 24 * 60 * 60) as i64; - let mut cutoff = false; - let mut files = logrotate.files(); - // task archives have task-logs sorted by endtime, with the oldest at the start of the - // file. So, peak into every archive and see if the first listed tasks' endtime would be - // cut off. If that's the case we know that the next (older) task archive rotation surely - // falls behind the cut-off line. We cannot say the same for the current archive without - // checking its last (newest) line, but that's more complex, expensive and rather unlikely. - for file_name in logrotate.file_names() { - if !cutoff { - let reader = match files.next() { - Some(file) => BufReader::new(file), - None => bail!("unexpected error: files do not match file_names"), - }; - if let Some(Ok(line)) = reader.lines().next() { - if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { - if state.endtime() < cutoff_time { - // found first file with the oldest entry being cut-off, so next older - // ones are all up for deletion. - cutoff = true; - rotated = true; - } - } - } - } else if let Err(err) = std::fs::remove_file(&file_name) { - log::error!("could not remove {:?}: {}", file_name, err); - } - } - } - - Ok(rotated) -} - -/// removes all task logs that are older than the oldest task entry in the -/// task archive -pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> { - let setup = worker_task_setup()?; - - let _lock = setup.lock_task_list_files(true)?; - - let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?; - - let mut timestamp = None; - if let Some(last_file) = logrotate.files().last() { - let reader = BufReader::new(last_file); - for line in reader.lines() { - let line = line?; - if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { - timestamp = Some(state.endtime()); - break; - } - } - } - - fn get_modified(entry: std::fs::DirEntry) -> Result { - entry.metadata()?.modified() - } - - if let Some(timestamp) = timestamp { - let cutoff_time = if timestamp > 0 { - SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) - } else { - SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) - } - .ok_or_else(|| format_err!("could not calculate cutoff time"))?; - - for i in 0..256 { - let mut path = setup.taskdir.clone(); - path.push(format!("{:02X}", i)); - let files = match std::fs::read_dir(path) { - Ok(files) => files, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, - Err(err) => { - task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err); - continue; - } - }; - for file in files { - let file = match file { - Ok(file) => file, - Err(err) => { - task_warn!( - worker, - "could not check some task log in '{:02X}': {}", - i, - err - ); - continue; - } - }; - let path = file.path(); - - let modified = match get_modified(file) { - Ok(modified) => modified, - Err(err) => { - task_warn!(worker, "error getting mtime for '{:?}': {}", path, err); - continue; - } - }; - - if modified < cutoff_time { - match std::fs::remove_file(&path) { - Ok(()) => {} - Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} - Err(err) => { - task_warn!(worker, "could not remove file '{:?}': {}", path, err) - } - } - } - } - } - } - - Ok(()) -} - -/// Path to the worker log file -pub fn upid_log_path(upid: &UPID) -> Result { - let setup = worker_task_setup()?; - Ok(setup.log_path(upid)) -} - -/// Read endtime (time of last log line) and exitstatus from task log file -/// If there is not a single line with at valid datetime, we assume the -/// starttime to be the endtime -pub fn upid_read_status(upid: &UPID) -> Result { - let setup = worker_task_setup()?; - - let mut status = TaskState::Unknown { - endtime: upid.starttime, - }; - - let path = setup.log_path(upid); - - let mut file = File::open(path)?; - - /// speedup - only read tail - use std::io::Seek; - use std::io::SeekFrom; - let _ = file.seek(SeekFrom::End(-8192)); // ignore errors - - let mut data = Vec::with_capacity(8192); - file.read_to_end(&mut data)?; - - // strip newlines at the end of the task logs - while data.last() == Some(&b'\n') { - data.pop(); - } - - let last_line = match data.iter().rposition(|c| *c == b'\n') { - Some(start) if data.len() > (start + 1) => &data[start + 1..], - Some(_) => &data, // should not happen, since we removed all trailing newlines - None => &data, - }; - - let last_line = std::str::from_utf8(last_line) - .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; - - let mut iter = last_line.splitn(2, ": "); - if let Some(time_str) = iter.next() { - if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) { - // set the endtime even if we cannot parse the state - status = TaskState::Unknown { endtime }; - if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { - if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { - status = state; - } - } - } - } - - Ok(status) -} - -lazy_static! { - static ref WORKER_TASK_LIST: Mutex>> = - Mutex::new(HashMap::new()); -} - -/// checks if the task UPID refers to a worker from this process -fn is_local_worker(upid: &UPID) -> bool { - upid.pid == crate::pid() && upid.pstart == crate::pstart() -} - -/// Test if the task is still running -pub async fn worker_is_active(upid: &UPID) -> Result { - if is_local_worker(upid) { - return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); - } - - if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { - return Ok(false); - } - - let sock = crate::ctrl_sock_from_pid(upid.pid); - let cmd = json!({ - "command": "worker-task-status", - "args": { - "upid": upid.to_string(), - }, - }); - let status = crate::send_command(sock, &cmd).await?; - - if let Some(active) = status.as_bool() { - Ok(active) - } else { - bail!("got unexpected result {:?} (expected bool)", status); - } -} - -/// Test if the task is still running (fast but inaccurate implementation) -/// -/// If the task is spawned from a different process, we simply return if -/// that process is still running. This information is good enough to detect -/// stale tasks... -pub fn worker_is_active_local(upid: &UPID) -> bool { - if is_local_worker(upid) { - WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) - } else { - procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() - } -} - -/// Register task control command on a [CommandSocket]. -/// -/// This create two commands: -/// -/// * ``worker-task-abort ``: calls [abort_local_worker] -/// -/// * ``worker-task-status ``: return true of false, depending on -/// whether the worker is running or stopped. -pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> { - fn get_upid(args: Option<&Value>) -> Result { - let args = if let Some(args) = args { - args - } else { - bail!("missing args") - }; - let upid = match args.get("upid") { - Some(Value::String(upid)) => upid.parse::()?, - None => bail!("no upid in args"), - _ => bail!("unable to parse upid"), - }; - if !is_local_worker(&upid) { - bail!("upid does not belong to this process"); - } - Ok(upid) - } - - commando_sock.register_command("worker-task-abort".into(), move |args| { - let upid = get_upid(args)?; - - abort_local_worker(upid); - - Ok(Value::Null) - })?; - commando_sock.register_command("worker-task-status".into(), move |args| { - let upid = get_upid(args)?; - - let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); - - Ok(active.into()) - })?; - - Ok(()) -} - -/// Try to abort a worker task, but do no wait -/// -/// Errors (if any) are simply logged. -pub fn abort_worker_nowait(upid: UPID) { - tokio::spawn(async move { - if let Err(err) = abort_worker(upid).await { - log::error!("abort worker task failed - {}", err); - } - }); -} - -/// Abort a worker task -/// -/// By sending ``worker-task-abort`` to the control socket. -pub async fn abort_worker(upid: UPID) -> Result<(), Error> { - let sock = crate::ctrl_sock_from_pid(upid.pid); - let cmd = json!({ - "command": "worker-task-abort", - "args": { - "upid": upid.to_string(), - }, - }); - crate::send_command(sock, &cmd).map_ok(|_| ()).await -} - -fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { - let data = line.splitn(3, ' ').collect::>(); - - let len = data.len(); - - match len { - 1 => Ok((data[0].to_owned(), data[0].parse::()?, None)), - 3 => { - let endtime = i64::from_str_radix(data[1], 16)?; - let state = TaskState::from_endtime_and_message(endtime, data[2])?; - Ok((data[0].to_owned(), data[0].parse::()?, Some(state))) - } - _ => bail!("wrong number of components"), - } -} - -/// Task State -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum TaskState { - /// The Task ended with an undefined state - Unknown { endtime: i64 }, - /// The Task ended and there were no errors or warnings - OK { endtime: i64 }, - /// The Task had 'count' amount of warnings and no errors - Warning { count: u64, endtime: i64 }, - /// The Task ended with the error described in 'message' - Error { message: String, endtime: i64 }, -} - -impl TaskState { - pub fn endtime(&self) -> i64 { - match *self { - TaskState::Unknown { endtime } => endtime, - TaskState::OK { endtime } => endtime, - TaskState::Warning { endtime, .. } => endtime, - TaskState::Error { endtime, .. } => endtime, - } - } - - fn result_text(&self) -> String { - match self { - TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), - other => format!("TASK {}", other), - } - } - - fn from_endtime_and_message(endtime: i64, s: &str) -> Result { - if s == "unknown" { - Ok(TaskState::Unknown { endtime }) - } else if s == "OK" { - Ok(TaskState::OK { endtime }) - } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { - let count: u64 = warnings.parse()?; - Ok(TaskState::Warning { count, endtime }) - } else if !s.is_empty() { - let message = if let Some(err) = s.strip_prefix("ERROR: ") { - err - } else { - s - } - .to_string(); - Ok(TaskState::Error { message, endtime }) - } else { - bail!("unable to parse Task Status '{}'", s); - } - } -} - -impl std::cmp::PartialOrd for TaskState { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.endtime().cmp(&other.endtime())) - } -} - -impl std::cmp::Ord for TaskState { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.endtime().cmp(&other.endtime()) - } -} - -impl std::fmt::Display for TaskState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskState::Unknown { .. } => write!(f, "unknown"), - TaskState::OK { .. } => write!(f, "OK"), - TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), - TaskState::Error { message, .. } => write!(f, "{}", message), - } - } -} - -/// Task details including parsed UPID -/// -/// If there is no `state`, the task is still running. -#[derive(Debug)] -pub struct TaskListInfo { - /// The parsed UPID - pub upid: UPID, - /// UPID string representation - pub upid_str: String, - /// Task `(endtime, status)` if already finished - pub state: Option, // endtime, status -} - -fn render_task_line(info: &TaskListInfo) -> String { - let mut raw = String::new(); - if let Some(status) = &info.state { - use std::fmt::Write as _; - - let _ = writeln!(raw, "{} {:08X} {}", info.upid_str, status.endtime(), status); - } else { - raw.push_str(&info.upid_str); - raw.push('\n'); - } - - raw -} - -fn render_task_list(list: &[TaskListInfo]) -> String { - let mut raw = String::new(); - for info in list { - raw.push_str(&render_task_line(info)); - } - raw -} - -// note this is not locked, caller has to make sure it is -// this will skip (and log) lines that are not valid status lines -fn read_task_file(reader: R) -> Result, Error> { - let reader = BufReader::new(reader); - let mut list = Vec::new(); - for line in reader.lines() { - let line = line?; - match parse_worker_status_line(&line) { - Ok((upid_str, upid, state)) => list.push(TaskListInfo { - upid_str, - upid, - state, - }), - Err(err) => { - log::warn!("unable to parse worker status '{}' - {}", line, err); - continue; - } - }; - } - - Ok(list) -} - -// note this is not locked, caller has to make sure it is -fn read_task_file_from_path

(path: P) -> Result, Error> -where - P: AsRef + std::fmt::Debug, -{ - let file = match File::open(&path) { - Ok(f) => f, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), - Err(err) => bail!("unable to open task list {:?} - {}", path, err), - }; - - read_task_file(file) -} - -/// Iterate over existing/active worker tasks -pub struct TaskListInfoIterator { - list: VecDeque, - end: bool, - archive: Option, - lock: Option, -} - -impl TaskListInfoIterator { - /// Creates a new iterator instance. - pub fn new(active_only: bool) -> Result { - let setup = worker_task_setup()?; - - let (read_lock, active_list) = { - let lock = setup.lock_task_list_files(false)?; - let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; - - let needs_update = active_list - .iter() - .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); - - // TODO remove with 1.x - let index_exists = setup.task_index_fn.is_file(); - - if needs_update || index_exists { - drop(lock); - setup.update_active_workers(None)?; - let lock = setup.lock_task_list_files(false)?; - let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; - (lock, active_list) - } else { - (lock, active_list) - } - }; - - let archive = if active_only { - None - } else { - let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?; - Some(logrotate.files()) - }; - - let lock = if active_only { None } else { Some(read_lock) }; - - Ok(Self { - list: active_list.into(), - end: active_only, - archive, - lock, - }) - } -} - -impl Iterator for TaskListInfoIterator { - type Item = Result; - - fn next(&mut self) -> Option { - loop { - if let Some(element) = self.list.pop_back() { - return Some(Ok(element)); - } else if self.end { - return None; - } else { - if let Some(mut archive) = self.archive.take() { - if let Some(file) = archive.next() { - let list = match read_task_file(file) { - Ok(list) => list, - Err(err) => return Some(Err(err)), - }; - self.list.append(&mut list.into()); - self.archive = Some(archive); - continue; - } - } - - self.end = true; - self.lock.take(); - } - } - } -} - -/// Launch long running worker tasks. -/// -/// A worker task can either be a whole thread, or a simply tokio -/// task/future. Each task can `log()` messages, which are stored -/// persistently to files. Task should poll the `abort_requested` -/// flag, and stop execution when requested. -pub struct WorkerTask { - setup: &'static WorkerTaskSetup, - upid: UPID, - data: Mutex, - abort_requested: AtomicBool, -} - -impl std::fmt::Display for WorkerTask { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - self.upid.fmt(f) - } -} - -struct WorkerTaskData { - logger: FileLogger, - progress: f64, // 0..1 - warn_count: u64, - pub abort_listeners: Vec>, -} - -impl WorkerTask { - pub fn new( - worker_type: &str, - worker_id: Option, - auth_id: String, - to_stdout: bool, - ) -> Result, Error> { - let setup = worker_task_setup()?; - - let upid = UPID::new(worker_type, worker_id, auth_id)?; - let task_id = upid.task_id; - - let mut path = setup.taskdir.clone(); - - path.push(format!("{:02X}", upid.pstart & 255)); - - let dir_opts = setup - .file_opts - .clone() - .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); - - create_path(&path, None, Some(dir_opts))?; - - path.push(upid.to_string()); - - let logger_options = FileLogOptions { - to_stdout, - exclusive: true, - prefix_time: true, - read: true, - file_opts: setup.file_opts.clone(), - ..Default::default() - }; - let logger = FileLogger::new(&path, logger_options)?; - - let worker = Arc::new(Self { - setup, - upid: upid.clone(), - abort_requested: AtomicBool::new(false), - data: Mutex::new(WorkerTaskData { - logger, - progress: 0.0, - warn_count: 0, - abort_listeners: vec![], - }), - }); - - // scope to drop the lock again after inserting - { - let mut hash = WORKER_TASK_LIST.lock().unwrap(); - hash.insert(task_id, worker.clone()); - crate::set_worker_count(hash.len()); - } - - setup.update_active_workers(Some(&upid))?; - - Ok(worker) - } - - /// Spawn a new tokio task/future. - pub fn spawn( - worker_type: &str, - worker_id: Option, - auth_id: String, - to_stdout: bool, - f: F, - ) -> Result - where - F: Send + 'static + FnOnce(Arc) -> T, - T: Send + 'static + Future>, - { - let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; - let upid_str = worker.upid.to_string(); - let f = f(worker.clone()); - tokio::spawn(async move { - let result = f.await; - worker.log_result(&result); - }); - - Ok(upid_str) - } - - /// Create a new worker thread. - pub fn new_thread( - worker_type: &str, - worker_id: Option, - auth_id: String, - to_stdout: bool, - f: F, - ) -> Result - where - F: Send + UnwindSafe + 'static + FnOnce(Arc) -> Result<(), Error>, - { - let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; - let upid_str = worker.upid.to_string(); - - let _child = std::thread::Builder::new() - .name(upid_str.clone()) - .spawn(move || { - let worker1 = worker.clone(); - let result = match std::panic::catch_unwind(move || f(worker1)) { - Ok(r) => r, - Err(panic) => match panic.downcast::<&str>() { - Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)), - Err(_) => Err(format_err!("worker panicked: unknown type.")), - }, - }; - - worker.log_result(&result); - }); - - Ok(upid_str) - } - - /// create state from self and a result - pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { - let warn_count = self.data.lock().unwrap().warn_count; - - let endtime = proxmox_time::epoch_i64(); - - if let Err(err) = result { - TaskState::Error { - message: err.to_string(), - endtime, - } - } else if warn_count > 0 { - TaskState::Warning { - count: warn_count, - endtime, - } - } else { - TaskState::OK { endtime } - } - } - - /// Log task result, remove task from running list - pub fn log_result(&self, result: &Result<(), Error>) { - let state = self.create_state(result); - self.log_message(state.result_text()); - - WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); - let _ = self.setup.update_active_workers(None); - crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); - } - - /// Log a message. - pub fn log_message>(&self, msg: S) { - let mut data = self.data.lock().unwrap(); - data.logger.log(msg); - } - - /// Log a message as warning. - pub fn log_warning>(&self, msg: S) { - let mut data = self.data.lock().unwrap(); - data.logger.log(format!("WARN: {}", msg.as_ref())); - data.warn_count += 1; - } - - /// Set progress indicator - pub fn progress(&self, progress: f64) { - if (0.0..=1.0).contains(&progress) { - let mut data = self.data.lock().unwrap(); - data.progress = progress; - } else { - // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); - } - } - - /// Request abort - pub fn request_abort(&self) { - let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); - if !prev_abort { - self.log_message("received abort request ..."); // log abort only once - } - // noitify listeners - let mut data = self.data.lock().unwrap(); - loop { - match data.abort_listeners.pop() { - None => { - break; - } - Some(ch) => { - let _ = ch.send(()); // ignore errors here - } - } - } - } - - /// Get a future which resolves on task abort - pub fn abort_future(&self) -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel::<()>(); - - let mut data = self.data.lock().unwrap(); - if self.abort_requested() { - let _ = tx.send(()); - } else { - data.abort_listeners.push(tx); - } - rx - } - - pub fn upid(&self) -> &UPID { - &self.upid - } -} - -impl WorkerTaskContext for WorkerTask { - fn abort_requested(&self) -> bool { - self.abort_requested.load(Ordering::SeqCst) - } - - fn shutdown_requested(&self) -> bool { - crate::shutdown_requested() - } - - fn fail_on_shutdown(&self) -> Result<(), Error> { - crate::fail_on_shutdown() - } - - fn log(&self, level: log::Level, message: &std::fmt::Arguments) { - match level { - log::Level::Error => self.log_warning(&message.to_string()), - log::Level::Warn => self.log_warning(&message.to_string()), - log::Level::Info => self.log_message(&message.to_string()), - log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), - log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), - } - } -} - -/// Wait for a locally spanned worker task -/// -/// Note: local workers should print logs to stdout, so there is no -/// need to fetch/display logs. We just wait for the worker to finish. -pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { - let upid: UPID = upid_str.parse()?; - - let sleep_duration = core::time::Duration::new(0, 100_000_000); - - loop { - if worker_is_active_local(&upid) { - tokio::time::sleep(sleep_duration).await; - } else { - break; - } - } - Ok(()) -} - -/// Request abort of a local worker (if existing and running) -pub fn abort_local_worker(upid: UPID) { - if let Some(worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { - worker.request_abort(); - } -} diff --git a/proxmox-restore-daemon/Cargo.toml b/proxmox-restore-daemon/Cargo.toml index b127fe69..dff65b5d 100644 --- a/proxmox-restore-daemon/Cargo.toml +++ b/proxmox-restore-daemon/Cargo.toml @@ -28,6 +28,7 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] } proxmox-async = "0.4" proxmox-compression = "0.1.1" +proxmox-rest-server = "0.2" proxmox-router = { version = "1.3.0", features = [ "cli", "server" ] } proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] } proxmox-time = "1" @@ -36,5 +37,4 @@ proxmox-sys = { version = "0.4", features = [ "sortable-macro" ] } pbs-api-types = { path = "../pbs-api-types" } pbs-tools = { path = "../pbs-tools" } pbs-datastore = { path = "../pbs-datastore" } -proxmox-rest-server = { path = "../proxmox-rest-server" } pbs-client = { path = "../pbs-client" }