move proxmox-rest-server to proxmox-rs as separate package

Lives now in the common proxmox rs repo[0] for better reuse with
other projects.

[0]: https://git.proxmox.com/?p=proxmox.git;a=tree;f=proxmox-rest-server;h=8035b65a00271604c229590d0109aba3f75ee784

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-10-11 15:27:47 +02:00
parent 064a9a6bb1
commit 938dd9278e
18 changed files with 4 additions and 4104 deletions

View File

@ -25,7 +25,6 @@ members = [
"pbs-config",
"pbs-datastore",
"pbs-fuse-loop",
"proxmox-rest-server",
"proxmox-rrd",
"pbs-tape",
"pbs-tools",
@ -108,6 +107,7 @@ proxmox-shared-memory = "0.2"
proxmox-subscription = { version = "0.3", features = [ "api-types" ] }
proxmox-sys = { version = "0.4", features = [ "sortable-macro", "timer" ] }
proxmox-compression = "0.1"
proxmox-rest-server = "0.2"
proxmox-acme-rs = "0.4"
@ -120,7 +120,6 @@ pbs-buildcfg = { path = "pbs-buildcfg" }
pbs-client = { path = "pbs-client" }
pbs-config = { path = "pbs-config" }
pbs-datastore = { path = "pbs-datastore" }
proxmox-rest-server = { path = "proxmox-rest-server" }
proxmox-rrd = { path = "proxmox-rrd" }
pbs-tools = { path = "pbs-tools" }
pbs-tape = { path = "pbs-tape" }
@ -138,6 +137,7 @@ pbs-tape = { path = "pbs-tape" }
#proxmox-io = { path = "../proxmox/proxmox-io" }
#proxmox-lang = { path = "../proxmox/proxmox-lang" }
#proxmox-openid = { path = "../proxmox-openid-rs" }
#proxmox-router = { path = "../proxmox/proxmox-rest-server" }
#proxmox-router = { path = "../proxmox/proxmox-router" }
#proxmox-schema = { path = "../proxmox/proxmox-schema" }
#proxmox-section-config = { path = "../proxmox/proxmox-section-config" }

View File

@ -38,7 +38,6 @@ SUBCRATES := \
pbs-config \
pbs-datastore \
pbs-fuse-loop \
proxmox-rest-server \
proxmox-rrd \
pbs-tape \
pbs-tools \

1
debian/control vendored
View File

@ -58,6 +58,7 @@ Build-Depends: debhelper (>= 12),
librust-proxmox-lang-1+default-dev (>= 1.1-~~),
librust-proxmox-metrics-0.2+default-dev,
librust-proxmox-openid-0.9+default-dev,
librust-proxmox-rest-server-dev,
librust-proxmox-router-1+cli-dev (>= 1.3.0-~~),
librust-proxmox-router-1+default-dev (>= 1.3.0-~~),
librust-proxmox-router-1+server-dev (>= 1.3.0-~~),

View File

@ -1,43 +0,0 @@
[package]
name = "proxmox-rest-server"
version = "0.1.0"
authors = ["Proxmox Support Team <support@proxmox.com>"]
edition = "2018"
description = "REST server implementation"
# for example
[dev-dependencies]
proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] }
tokio = { version = "1.6", features = [ "rt-multi-thread", "signal", "process" ] }
[dependencies]
anyhow = "1.0"
futures = "0.3"
handlebars = "3.0"
http = "0.2"
hyper = { version = "0.14.5", features = [ "full" ] }
lazy_static = "1.4"
libc = "0.2"
log = "0.4.17"
nix = "0.24"
once_cell = "1.3.1"
percent-encoding = "2.1"
regex = "1.5"
serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
tokio = { version = "1.6", features = ["signal", "process"] }
tokio-openssl = "0.6.1"
tokio-stream = "0.1.0"
tower-service = "0.3.0"
url = "2.1"
#proxmox = "0.15.3"
proxmox-async = "0.4"
proxmox-compression = "0.1.1"
proxmox-io = "1"
proxmox-lang = "1.1"
proxmox-http = { version = "0.7", features = [ "client" ] }
proxmox-router = "1.3.0"
proxmox-schema = { version = "1.3.1", features = [ "api-macro", "upid-api-impl" ] }
proxmox-time = "1"
proxmox-sys = { version = "0.4", features = [ "logrotate", "timer" ] }

View File

@ -1,232 +0,0 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Mutex;
use anyhow::{bail, format_err, Error};
use http::request::Parts;
use http::HeaderMap;
use hyper::{Body, Method, Response};
use lazy_static::lazy_static;
use proxmox_router::{
list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
};
use proxmox_schema::api;
use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter};
// Create a Dummy User information system
struct DummyUserInfo;
impl UserInformation for DummyUserInfo {
fn is_superuser(&self, _userid: &str) -> bool {
// Always return true here, so we have access to everything
true
}
fn is_group_member(&self, _userid: &str, group: &str) -> bool {
group == "Group"
}
fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 {
u64::MAX
}
}
struct MinimalServer;
// implement the server adapter
impl ServerAdapter for MinimalServer {
// normally this would check and authenticate the user
fn check_auth(
&self,
_headers: &HeaderMap,
_method: &Method,
) -> Pin<
Box<
dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+ Send,
>,
> {
Box::pin(async move {
// get some global/cached userinfo
let userinfo: Box<dyn UserInformation + Sync + Send> = Box::new(DummyUserInfo);
// Do some user checks, e.g. cookie/csrf
Ok(("User".to_string(), userinfo))
})
}
// this should return the index page of the webserver, iow. what the user browses to
fn get_index(
&self,
_env: RestEnvironment,
_parts: Parts,
) -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
Box::pin(async move {
// build an index page
http::Response::builder()
.body("hello world".into())
.unwrap()
})
}
}
// a few examples on how to do api calls with the Router
#[api]
/// A simple ping method. returns "pong"
fn ping() -> Result<String, Error> {
Ok("pong".to_string())
}
lazy_static! {
static ref ITEM_MAP: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
}
#[api]
/// Lists all current items
fn list_items() -> Result<Vec<String>, Error> {
Ok(ITEM_MAP.lock().unwrap().keys().cloned().collect())
}
#[api(
input: {
properties: {
name: {
type: String,
description: "The name",
},
value: {
type: String,
description: "The value",
},
},
},
)]
/// creates a new item
fn create_item(name: String, value: String) -> Result<(), Error> {
let mut map = ITEM_MAP.lock().unwrap();
if map.contains_key(&name) {
bail!("{} already exists", name);
}
map.insert(name, value);
Ok(())
}
#[api(
input: {
properties: {
name: {
type: String,
description: "The name",
},
},
},
)]
/// returns the value of an item
fn get_item(name: String) -> Result<String, Error> {
ITEM_MAP
.lock()
.unwrap()
.get(&name)
.map(|s| s.to_string())
.ok_or_else(|| format_err!("no such item '{}'", name))
}
#[api(
input: {
properties: {
name: {
type: String,
description: "The name",
},
value: {
type: String,
description: "The value",
},
},
},
)]
/// updates an item
fn update_item(name: String, value: String) -> Result<(), Error> {
if let Some(val) = ITEM_MAP.lock().unwrap().get_mut(&name) {
*val = value;
} else {
bail!("no such item '{}'", name);
}
Ok(())
}
#[api(
input: {
properties: {
name: {
type: String,
description: "The name",
},
},
},
)]
/// deletes an item
fn delete_item(name: String) -> Result<(), Error> {
if ITEM_MAP.lock().unwrap().remove(&name).is_none() {
bail!("no such item '{}'", name);
}
Ok(())
}
const ITEM_ROUTER: Router = Router::new()
.get(&API_METHOD_GET_ITEM)
.put(&API_METHOD_UPDATE_ITEM)
.delete(&API_METHOD_DELETE_ITEM);
const SUBDIRS: SubdirMap = &[
(
"items",
&Router::new()
.get(&API_METHOD_LIST_ITEMS)
.post(&API_METHOD_CREATE_ITEM)
.match_all("name", &ITEM_ROUTER),
),
("ping", &Router::new().get(&API_METHOD_PING)),
];
const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))
.subdirs(SUBDIRS);
async fn run() -> Result<(), Error> {
// we first have to configure the api environment (basedir etc.)
let config = ApiConfig::new(
"/var/tmp/",
&ROUTER,
RpcEnvironmentType::PUBLIC,
MinimalServer,
)?;
let rest_server = RestServer::new(config);
// then we have to create a daemon that listens, accepts and serves the api to clients
proxmox_rest_server::daemon::create_daemon(
([127, 0, 0, 1], 65000).into(),
move |listener| {
let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
Ok(async move {
hyper::Server::builder(incoming).serve(rest_server).await?;
Ok(())
})
},
None,
)
.await?;
Ok(())
}
fn main() -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { run().await })
}

View File

@ -1,287 +0,0 @@
use std::collections::HashMap;
use std::fs::metadata;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use hyper::http::request::Parts;
use hyper::{Body, Method, Response};
use handlebars::Handlebars;
use serde::Serialize;
use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{create_path, CreateOptions};
use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter};
/// REST server configuration
pub struct ApiConfig {
basedir: PathBuf,
router: &'static Router,
aliases: HashMap<String, PathBuf>,
env_type: RpcEnvironmentType,
templates: RwLock<Handlebars<'static>>,
template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
request_log: Option<Arc<Mutex<FileLogger>>>,
auth_log: Option<Arc<Mutex<FileLogger>>>,
adapter: Pin<Box<dyn ServerAdapter + Send + Sync>>,
}
impl ApiConfig {
/// Creates a new instance
///
/// `basedir` - File lookups are relative to this directory.
///
/// `router` - The REST API definition.
///
/// `env_type` - The environment type.
///
/// `api_auth` - The Authentication handler
///
/// `get_index_fn` - callback to generate the root page
/// (index). Please note that this functions gets a reference to
/// the [ApiConfig], so it can use [Handlebars] templates
/// ([render_template](Self::render_template) to generate pages.
pub fn new<B: Into<PathBuf>>(
basedir: B,
router: &'static Router,
env_type: RpcEnvironmentType,
adapter: impl ServerAdapter + 'static,
) -> Result<Self, Error> {
Ok(Self {
basedir: basedir.into(),
router,
aliases: HashMap::new(),
env_type,
templates: RwLock::new(Handlebars::new()),
template_files: RwLock::new(HashMap::new()),
request_log: None,
auth_log: None,
adapter: Box::pin(adapter),
})
}
pub(crate) async fn get_index(
&self,
rest_env: RestEnvironment,
parts: Parts,
) -> Response<Body> {
self.adapter.get_index(rest_env, parts).await
}
pub(crate) async fn check_auth(
&self,
headers: &http::HeaderMap,
method: &hyper::Method,
) -> Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError> {
self.adapter.check_auth(headers, method).await
}
pub(crate) fn find_method(
&self,
components: &[&str],
method: Method,
uri_param: &mut HashMap<String, String>,
) -> Option<&'static ApiMethod> {
self.router.find_method(components, method, uri_param)
}
pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf {
let mut prefix = String::new();
let mut filename = self.basedir.clone();
let comp_len = components.len();
if comp_len >= 1 {
prefix.push_str(components[0]);
if let Some(subdir) = self.aliases.get(&prefix) {
filename.push(subdir);
components
.iter()
.skip(1)
.for_each(|comp| filename.push(comp));
} else {
components.iter().for_each(|comp| filename.push(comp));
}
}
filename
}
/// Register a path alias
///
/// This can be used to redirect file lookups to a specific
/// directory, e.g.:
///
/// ```
/// use proxmox_rest_server::ApiConfig;
/// // let mut config = ApiConfig::new(...);
/// # fn fake(config: &mut ApiConfig) {
/// config.add_alias("extjs", "/usr/share/javascript/extjs");
/// # }
/// ```
pub fn add_alias<S, P>(&mut self, alias: S, path: P)
where
S: Into<String>,
P: Into<PathBuf>,
{
self.aliases.insert(alias.into(), path.into());
}
pub(crate) fn env_type(&self) -> RpcEnvironmentType {
self.env_type
}
/// Register a [Handlebars] template file
///
/// Those templates cane be use with [render_template](Self::render_template) to generate pages.
pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
where
P: Into<PathBuf>,
{
if self.template_files.read().unwrap().contains_key(name) {
bail!("template already registered");
}
let path: PathBuf = path.into();
let metadata = metadata(&path)?;
let mtime = metadata.modified()?;
self.templates
.write()
.unwrap()
.register_template_file(name, &path)?;
self.template_files
.write()
.unwrap()
.insert(name.to_string(), (mtime, path));
Ok(())
}
/// Checks if the template was modified since the last rendering
/// if yes, it loads a the new version of the template
pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
where
T: Serialize,
{
let path;
let mtime;
{
let template_files = self.template_files.read().unwrap();
let (old_mtime, old_path) = template_files
.get(name)
.ok_or_else(|| format_err!("template not found"))?;
mtime = metadata(old_path)?.modified()?;
if mtime <= *old_mtime {
return self
.templates
.read()
.unwrap()
.render(name, data)
.map_err(|err| format_err!("{}", err));
}
path = old_path.to_path_buf();
}
{
let mut template_files = self.template_files.write().unwrap();
let mut templates = self.templates.write().unwrap();
templates.register_template_file(name, &path)?;
template_files.insert(name.to_string(), (mtime, path));
templates
.render(name, data)
.map_err(|err| format_err!("{}", err))
}
}
/// Enable the access log feature
///
/// When enabled, all requests are logged to the specified file.
/// This function also registers a `api-access-log-reopen`
/// command one the [CommandSocket].
pub fn enable_access_log<P>(
&mut self,
path: P,
dir_opts: Option<CreateOptions>,
file_opts: Option<CreateOptions>,
commando_sock: &mut CommandSocket,
) -> Result<(), Error>
where
P: Into<PathBuf>,
{
let path: PathBuf = path.into();
if let Some(base) = path.parent() {
if !base.exists() {
create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?;
}
}
let logger_options = FileLogOptions {
append: true,
file_opts: file_opts.unwrap_or_default(),
..Default::default()
};
let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
self.request_log = Some(Arc::clone(&request_log));
commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
log::info!("re-opening access-log file");
request_log.lock().unwrap().reopen()?;
Ok(serde_json::Value::Null)
})?;
Ok(())
}
/// Enable the authentication log feature
///
/// When enabled, all authentication requests are logged to the
/// specified file. This function also registers a
/// `api-auth-log-reopen` command one the [CommandSocket].
pub fn enable_auth_log<P>(
&mut self,
path: P,
dir_opts: Option<CreateOptions>,
file_opts: Option<CreateOptions>,
commando_sock: &mut CommandSocket,
) -> Result<(), Error>
where
P: Into<PathBuf>,
{
let path: PathBuf = path.into();
if let Some(base) = path.parent() {
if !base.exists() {
create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?;
}
}
let logger_options = FileLogOptions {
append: true,
prefix_time: true,
file_opts: file_opts.unwrap_or_default(),
..Default::default()
};
let auth_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
self.auth_log = Some(Arc::clone(&auth_log));
commando_sock.register_command("api-auth-log-reopen".into(), move |_args| {
log::info!("re-opening auth-log file");
auth_log.lock().unwrap().reopen()?;
Ok(serde_json::Value::Null)
})?;
Ok(())
}
pub(crate) fn get_access_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
self.request_log.as_ref()
}
pub(crate) fn get_auth_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
self.auth_log.as_ref()
}
}

View File

@ -1,241 +0,0 @@
use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::*;
use nix::sys::socket;
use nix::unistd::Gid;
use serde::Serialize;
use serde_json::Value;
use tokio::net::UnixListener;
// Listens on a Unix Socket to handle simple command asynchronously
fn create_control_socket<P, F>(
path: P,
gid: Gid,
func: F,
) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
{
let path: PathBuf = path.into();
let gid = gid.as_raw();
let socket = UnixListener::bind(&path)?;
let func = Arc::new(func);
let control_future = async move {
loop {
let (conn, _addr) = match socket.accept().await {
Ok(data) => data,
Err(err) => {
log::error!("failed to accept on control socket {:?}: {}", path, err);
continue;
}
};
let opt = socket::sockopt::PeerCredentials {};
let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
Ok(cred) => cred,
Err(err) => {
log::error!("no permissions - unable to read peer credential - {}", err);
continue;
}
};
// check permissions (same gid, root user, or backup group)
let mygid = unsafe { libc::getgid() };
if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) {
log::error!("no permissions for {:?}", cred);
continue;
}
let (rx, mut tx) = tokio::io::split(conn);
let abort_future = super::last_worker_future().map(|_| ());
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let func = Arc::clone(&func);
let path = path.clone();
tokio::spawn(
futures::future::select(
async move {
let mut rx = tokio::io::BufReader::new(rx);
let mut line = String::new();
loop {
line.clear();
match rx
.read_line({
line.clear();
&mut line
})
.await
{
Ok(0) => break,
Ok(_) => (),
Err(err) => {
log::error!("control socket {:?} read error: {}", path, err);
return;
}
}
let response = match line.parse::<Value>() {
Ok(param) => match func(param) {
Ok(res) => format!("OK: {}\n", res),
Err(err) => format!("ERROR: {}\n", err),
},
Err(err) => format!("ERROR: {}\n", err),
};
if let Err(err) = tx.write_all(response.as_bytes()).await {
log::error!(
"control socket {:?} write response error: {}",
path,
err
);
return;
}
}
}
.boxed(),
abort_future,
)
.map(|_| ()),
);
}
}
.boxed();
let abort_future = crate::last_worker_future().map_err(|_| {});
let task = futures::future::select(control_future, abort_future)
.map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
Ok(task)
}
/// Send a command to the specified socket
pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
where
P: AsRef<Path>,
T: ?Sized + Serialize,
{
let mut command_string = serde_json::to_string(params)?;
command_string.push('\n');
send_raw_command(path.as_ref(), &command_string).await
}
/// Send a raw command (string) to the specified socket
pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
where
P: AsRef<Path>,
{
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let mut conn = tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err))
.await?;
conn.write_all(command_string.as_bytes()).await?;
if !command_string.as_bytes().ends_with(b"\n") {
conn.write_all(b"\n").await?;
}
AsyncWriteExt::shutdown(&mut conn).await?;
let mut rx = tokio::io::BufReader::new(conn);
let mut data = String::new();
if rx.read_line(&mut data).await? == 0 {
bail!("no response");
}
if let Some(res) = data.strip_prefix("OK: ") {
match res.parse::<Value>() {
Ok(v) => Ok(v),
Err(err) => bail!("unable to parse json response - {}", err),
}
} else if let Some(err) = data.strip_prefix("ERROR: ") {
bail!("{}", err);
} else {
bail!("unable to parse response: {}", data);
}
}
// A callback for a specific commando socket.
type CommandSocketFn =
Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
/// Tooling to get a single control command socket where one can
/// register multiple commands dynamically.
///
/// The socket is activated by calling [spawn](CommandSocket::spawn),
/// which spawns an async tokio task to process the commands.
pub struct CommandSocket {
socket: PathBuf,
gid: Gid,
commands: HashMap<String, CommandSocketFn>,
}
impl CommandSocket {
/// Creates a new instance.
pub fn new<P>(path: P, gid: Gid) -> Self
where
P: Into<PathBuf>,
{
CommandSocket {
socket: path.into(),
gid,
commands: HashMap::new(),
}
}
/// Spawn the socket and consume self, meaning you cannot register commands anymore after
/// calling this.
pub fn spawn(self) -> Result<(), Error> {
let control_future =
create_control_socket(self.socket.to_owned(), self.gid, move |param| {
let param = param.as_object().ok_or_else(|| {
format_err!("unable to parse parameters (expected json object)")
})?;
let command = match param.get("command") {
Some(Value::String(command)) => command.as_str(),
None => bail!("no command"),
_ => bail!("unable to parse command"),
};
if !self.commands.contains_key(command) {
bail!("got unknown command '{}'", command);
}
match self.commands.get(command) {
None => bail!("got unknown command '{}'", command),
Some(handler) => {
let args = param.get("args"); //.unwrap_or(&Value::Null);
(handler)(args)
}
}
})?;
tokio::spawn(control_future);
Ok(())
}
/// Register a new command with a callback.
pub fn register_command<F>(&mut self, command: String, handler: F) -> Result<(), Error>
where
F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
{
if self.commands.contains_key(&command) {
bail!("command '{}' already exists!", command);
}
self.commands.insert(command, Box::new(handler));
Ok(())
}
}

View File

@ -1,39 +0,0 @@
use anyhow::{bail, Error};
use hyper::header;
/// Possible Compression Methods, order determines preference (later is preferred)
#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
pub enum CompressionMethod {
Deflate,
// Gzip,
// Brotli,
}
impl CompressionMethod {
pub fn content_encoding(&self) -> header::HeaderValue {
header::HeaderValue::from_static(self.extension())
}
pub fn extension(&self) -> &'static str {
match *self {
// CompressionMethod::Brotli => "br",
// CompressionMethod::Gzip => "gzip",
CompressionMethod::Deflate => "deflate",
}
}
}
impl std::str::FromStr for CompressionMethod {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
// "br" => Ok(CompressionMethod::Brotli),
// "gzip" => Ok(CompressionMethod::Gzip),
"deflate" => Ok(CompressionMethod::Deflate),
// http accept-encoding allows to give weights with ';q='
other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate),
_ => bail!("unknown compression format"),
}
}
}

View File

@ -1,379 +0,0 @@
//! Helpers to implement restartable daemons/services.
use std::ffi::CString;
use std::future::Future;
use std::io::{Read, Write};
use std::os::raw::{c_char, c_int, c_uchar};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::panic::UnwindSafe;
use std::path::PathBuf;
use anyhow::{bail, format_err, Error};
use futures::future::{self, Either};
use nix::unistd::{fork, ForkResult};
use proxmox_io::{ReadExt, WriteExt};
use proxmox_sys::fd::{fd_change_cloexec, Fd};
use proxmox_sys::fs::CreateOptions;
// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
// Helper trait to "store" something in the environment to be re-used after re-executing the
// service on a reload.
trait Reloadable: Sized {
fn restore(var: &str) -> Result<Self, Error>;
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
}
// Manages things to be stored and reloaded upon reexec.
// Anything which should be restorable should be instantiated via this struct's `restore` method,
#[derive(Default)]
struct Reloader {
pre_exec: Vec<PreExecEntry>,
self_exe: PathBuf,
}
// Currently we only need environment variables for storage, but in theory we could also add
// variants which need temporary files or pipes...
struct PreExecEntry {
name: &'static str, // Feel free to change to String if necessary...
store_fn: BoxedStoreFunc,
}
impl Reloader {
pub fn new() -> Result<Self, Error> {
Ok(Self {
pre_exec: Vec::new(),
// Get the path to our executable as PathBuf
self_exe: std::fs::read_link("/proc/self/exe")?,
})
}
/// Restore an object from an environment variable of the given name, or, if none exists, uses
/// the function provided in the `or_create` parameter to instantiate the new "first" instance.
///
/// Values created via this method will be remembered for later re-execution.
pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
where
T: Reloadable,
F: FnOnce() -> U,
U: Future<Output = Result<T, Error>>,
{
let res = match std::env::var(name) {
Ok(varstr) => T::restore(&varstr)?,
Err(std::env::VarError::NotPresent) => or_create().await?,
Err(_) => bail!("variable {} has invalid value", name),
};
self.pre_exec.push(PreExecEntry {
name,
store_fn: res.get_store_func()?,
});
Ok(res)
}
fn pre_exec(self) -> Result<(), Error> {
for mut item in self.pre_exec {
std::env::set_var(item.name, (item.store_fn)()?);
}
Ok(())
}
pub fn fork_restart(self, pid_fn: Option<&str>) -> Result<(), Error> {
// Get our parameters as Vec<CString>
let args = std::env::args_os();
let mut new_args = Vec::with_capacity(args.len());
for arg in args {
new_args.push(CString::new(arg.as_bytes())?);
}
// Synchronisation pipe:
let (pold, pnew) = super::socketpair()?;
// Start ourselves in the background:
match unsafe { fork() } {
Ok(ForkResult::Child) => {
// Double fork so systemd can supervise us without nagging...
match unsafe { fork() } {
Ok(ForkResult::Child) => {
std::mem::drop(pold);
// At this point we call pre-exec helpers. We must be certain that if they fail for
// whatever reason we can still call `_exit()`, so use catch_unwind.
match std::panic::catch_unwind(move || {
let mut pnew =
unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) };
let pid = nix::unistd::Pid::this();
if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
log::error!("failed to send new server PID to parent: {}", e);
unsafe {
libc::_exit(-1);
}
}
let mut ok = [0u8];
if let Err(e) = pnew.read_exact(&mut ok) {
log::error!("parent vanished before notifying systemd: {}", e);
unsafe {
libc::_exit(-1);
}
}
assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
std::mem::drop(pnew);
// Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes())
.unwrap();
let ident = ident.as_bytes();
let fd =
unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
if fd >= 0 && fd != 1 {
let fd = proxmox_sys::fd::Fd(fd); // add drop handler
nix::unistd::dup2(fd.as_raw_fd(), 1)?;
} else {
log::error!("failed to update STDOUT journal redirection ({})", fd);
}
let fd =
unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
if fd >= 0 && fd != 2 {
let fd = proxmox_sys::fd::Fd(fd); // add drop handler
nix::unistd::dup2(fd.as_raw_fd(), 2)?;
} else {
log::error!("failed to update STDERR journal redirection ({})", fd);
}
self.do_reexec(new_args)
}) {
Ok(Ok(())) => log::error!("do_reexec returned!"),
Ok(Err(err)) => log::error!("do_reexec failed: {}", err),
Err(_) => log::error!("panic in re-exec"),
}
}
Ok(ForkResult::Parent { child }) => {
std::mem::drop((pold, pnew));
log::debug!("forked off a new server (second pid: {})", child);
}
Err(e) => log::error!("fork() failed, restart delayed: {}", e),
}
// No matter how we managed to get here, this is the time where we bail out quickly:
unsafe { libc::_exit(-1) }
}
Ok(ForkResult::Parent { child }) => {
log::debug!(
"forked off a new server (first pid: {}), waiting for 2nd pid",
child
);
std::mem::drop(pnew);
let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) };
let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
Ok(v) => v,
Err(e) => {
log::error!(
"failed to receive pid of double-forked child process: {}",
e
);
// systemd will complain but won't kill the service...
return Ok(());
}
});
if let Some(pid_fn) = pid_fn {
let pid_str = format!("{}\n", child);
proxmox_sys::fs::replace_file(
pid_fn,
pid_str.as_bytes(),
CreateOptions::new(),
false,
)?;
}
if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
log::error!("failed to notify systemd about the new main pid: {}", e);
}
// ensure systemd got the message about the new main PID before continuing, else it
// will get confused if the new main process sends its READY signal before that
if let Err(e) = systemd_notify_barrier(u64::MAX) {
log::error!("failed to wait on systemd-processing: {}", e);
}
// notify child that it is now the new main process:
if let Err(e) = pold.write_all(&[1u8]) {
log::error!("child vanished during reload: {}", e);
}
Ok(())
}
Err(e) => {
log::error!("fork() failed, restart delayed: {}", e);
Ok(())
}
}
}
fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
self.pre_exec()?;
nix::unistd::setsid()?;
let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
nix::unistd::execvp(&exe, &args)?;
panic!("exec misbehaved");
}
}
// For now all we need to do is store and reuse a tcp listening socket:
impl Reloadable for tokio::net::TcpListener {
// NOTE: The socket must not be closed when the store-function is called:
// FIXME: We could become "independent" of the TcpListener and its reference to the file
// descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
let mut fd_opt = Some(Fd(nix::fcntl::fcntl(
self.as_raw_fd(),
nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0),
)?));
Ok(Box::new(move || {
let fd = fd_opt.take().unwrap();
fd_change_cloexec(fd.as_raw_fd(), false)?;
Ok(fd.into_raw_fd().to_string())
}))
}
fn restore(var: &str) -> Result<Self, Error> {
let fd = var
.parse::<u32>()
.map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd;
fd_change_cloexec(fd, true)?;
Ok(Self::from_std(unsafe {
std::net::TcpListener::from_raw_fd(fd)
})?)
}
}
/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
/// If this is started regularly, a listening socket is created. In this case, the file descriptor
/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
/// If the variable already exists, its contents will instead be used to restore the listening
/// socket. The finished listening socket is then passed to the `create_service` function which
/// can be used to setup the TLS and the HTTP daemon. The returned future has to call
/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready.
pub async fn create_daemon<F, S>(
address: std::net::SocketAddr,
create_service: F,
pidfn: Option<&str>,
) -> Result<(), Error>
where
F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
S: Future<Output = Result<(), Error>>,
{
let mut reloader = Reloader::new()?;
let listener: tokio::net::TcpListener = reloader
.restore("PROXMOX_BACKUP_LISTEN_FD", move || async move {
Ok(tokio::net::TcpListener::bind(&address).await?)
})
.await?;
let service = create_service(listener)?;
let service = async move {
if let Err(err) = service.await {
log::error!("server error: {}", err);
}
};
let server_future = Box::pin(service);
let shutdown_future = crate::shutdown_future();
let finish_future = match future::select(server_future, shutdown_future).await {
Either::Left((_, _)) => {
if !crate::shutdown_requested() {
crate::request_shutdown(); // make sure we are in shutdown mode
}
None
}
Either::Right((_, server_future)) => Some(server_future),
};
let mut reloader = Some(reloader);
if crate::is_reload_request() {
log::info!("daemon reload...");
if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
log::error!("failed to notify systemd about the state change: {}", e);
}
if let Err(e) = systemd_notify_barrier(u64::MAX) {
log::error!("failed to wait on systemd-processing: {}", e);
}
if let Err(e) = reloader.take().unwrap().fork_restart(pidfn) {
log::error!("error during reload: {}", e);
let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
}
} else {
log::info!("daemon shutting down...");
}
if let Some(future) = finish_future {
future.await;
}
log::info!("daemon shut down.");
Ok(())
}
#[link(name = "systemd")]
extern "C" {
fn sd_journal_stream_fd(
identifier: *const c_uchar,
priority: c_int,
level_prefix: c_int,
) -> c_int;
fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int;
}
/// Systemd sercice startup states (see: ``man sd_notify``)
pub enum SystemdNotify {
Ready,
Reloading,
Stopping,
Status(String),
MainPid(nix::unistd::Pid),
}
/// Tells systemd the startup state of the service (see: ``man sd_notify``)
pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
let message = match state {
SystemdNotify::Ready => {
log::info!("service is ready");
CString::new("READY=1")
}
SystemdNotify::Reloading => CString::new("RELOADING=1"),
SystemdNotify::Stopping => CString::new("STOPPING=1"),
SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
}?;
let rc = unsafe { sd_notify(0, message.as_ptr()) };
if rc < 0 {
bail!(
"systemd_notify failed: {}",
std::io::Error::from_raw_os_error(-rc)
);
}
Ok(())
}
/// Waits until all previously sent messages with sd_notify are processed
pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> {
let rc = unsafe { sd_notify_barrier(0, timeout) };
if rc < 0 {
bail!(
"systemd_notify_barrier failed: {}",
std::io::Error::from_raw_os_error(-rc)
);
}
Ok(())
}

View File

@ -1,98 +0,0 @@
use std::net::SocketAddr;
use std::sync::Arc;
use serde_json::{json, Value};
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
use crate::ApiConfig;
/// Encapsulates information about the runtime environment
pub struct RestEnvironment {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Option<String>,
client_ip: Option<SocketAddr>,
api: Arc<ApiConfig>,
}
impl RestEnvironment {
pub fn new(env_type: RpcEnvironmentType, api: Arc<ApiConfig>) -> Self {
Self {
result_attributes: json!({}),
auth_id: None,
client_ip: None,
env_type,
api,
}
}
pub fn api_config(&self) -> &ApiConfig {
&self.api
}
pub fn log_auth(&self, auth_id: &str) {
let msg = format!("successful auth for user '{}'", auth_id);
log::debug!("{}", msg); // avoid noisy syslog, admins can already check the auth log
if let Some(auth_logger) = self.api.get_auth_log() {
auth_logger.lock().unwrap().log(&msg);
}
}
pub fn log_failed_auth(&self, failed_auth_id: Option<String>, msg: &str) {
let msg = match (self.client_ip, failed_auth_id) {
(Some(peer), Some(user)) => {
format!(
"authentication failure; rhost={} user={} msg={}",
peer, user, msg
)
}
(Some(peer), None) => {
format!("authentication failure; rhost={} msg={}", peer, msg)
}
(None, Some(user)) => {
format!(
"authentication failure; rhost=unknown user={} msg={}",
user, msg
)
}
(None, None) => {
format!("authentication failure; rhost=unknown msg={}", msg)
}
};
log::error!("{}", msg);
if let Some(auth_logger) = self.api.get_auth_log() {
auth_logger.lock().unwrap().log(&msg);
}
}
}
impl RpcEnvironment for RestEnvironment {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
fn result_attrib(&self) -> &Value {
&self.result_attributes
}
fn env_type(&self) -> RpcEnvironmentType {
self.env_type
}
fn set_auth_id(&mut self, auth_id: Option<String>) {
self.auth_id = auth_id;
}
fn get_auth_id(&self) -> Option<String> {
self.auth_id.clone()
}
fn set_client_ip(&mut self, client_ip: Option<SocketAddr>) {
self.client_ip = client_ip;
}
fn get_client_ip(&self) -> Option<SocketAddr> {
self.client_ip
}
}

View File

@ -1,147 +0,0 @@
use std::io::Write;
use anyhow::Error;
use nix::fcntl::OFlag;
use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions};
/// Options to control the behavior of a [FileLogger] instance
#[derive(Default)]
pub struct FileLogOptions {
/// Open underlying log file in append mode, useful when multiple concurrent processes
/// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
/// for writes smaller than the PIPE_BUF (4k on Linux).
/// Inside the same process you may need to still use an mutex, for shared access.
pub append: bool,
/// Open underlying log file as readable
pub read: bool,
/// If set, ensure that the file is newly created or error out if already existing.
pub exclusive: bool,
/// Duplicate logged messages to STDOUT, like tee
pub to_stdout: bool,
/// Prefix messages logged to the file with the current local time as RFC 3339
pub prefix_time: bool,
/// File owner/group and mode
pub file_opts: CreateOptions,
}
/// Log messages with optional automatically added timestamps into files
///
/// #### Example:
/// ```
/// # use anyhow::{bail, format_err, Error};
/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions};
///
/// # std::fs::remove_file("test.log");
/// let options = FileLogOptions {
/// to_stdout: true,
/// exclusive: true,
/// ..Default::default()
/// };
/// let mut log = FileLogger::new("test.log", options).unwrap();
/// flog!(log, "A simple log: {}", "Hello!");
/// # std::fs::remove_file("test.log");
/// ```
pub struct FileLogger {
file: std::fs::File,
file_name: std::path::PathBuf,
options: FileLogOptions,
}
/// Log messages to [FileLogger] - ``println`` like macro
#[macro_export]
macro_rules! flog {
($log:expr, $($arg:tt)*) => ({
$log.log(format!($($arg)*));
})
}
impl FileLogger {
pub fn new<P: AsRef<std::path::Path>>(
file_name: P,
options: FileLogOptions,
) -> Result<Self, Error> {
let file = Self::open(&file_name, &options)?;
let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
Ok(Self {
file,
file_name,
options,
})
}
pub fn reopen(&mut self) -> Result<&Self, Error> {
let file = Self::open(&self.file_name, &self.options)?;
self.file = file;
Ok(self)
}
fn open<P: AsRef<std::path::Path>>(
file_name: P,
options: &FileLogOptions,
) -> Result<std::fs::File, Error> {
let mut flags = OFlag::O_CLOEXEC;
if options.read {
flags |= OFlag::O_RDWR;
} else {
flags |= OFlag::O_WRONLY;
}
if options.append {
flags |= OFlag::O_APPEND;
}
if options.exclusive {
flags |= OFlag::O_EXCL;
}
let file =
atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
Ok(file)
}
pub fn log<S: AsRef<str>>(&mut self, msg: S) {
let msg = msg.as_ref();
if self.options.to_stdout {
let mut stdout = std::io::stdout();
stdout.write_all(msg.as_bytes()).unwrap();
stdout.write_all(b"\n").unwrap();
}
let line = if self.options.prefix_time {
let now = proxmox_time::epoch_i64();
let rfc3339 = match proxmox_time::epoch_to_rfc3339(now) {
Ok(rfc3339) => rfc3339,
Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
};
format!("{}: {}\n", rfc3339, msg)
} else {
format!("{}\n", msg)
};
if let Err(err) = self.file.write_all(line.as_bytes()) {
// avoid panicking, log methods should not do that
// FIXME: or, return result???
log::error!("error writing to log file - {}", err);
}
}
}
impl std::io::Write for FileLogger {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
if self.options.to_stdout {
let _ = std::io::stdout().write(buf);
}
self.file.write(buf)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
if self.options.to_stdout {
let _ = std::io::stdout().flush();
}
self.file.flush()
}
}

View File

@ -1,235 +0,0 @@
//! Helpers to format response data
use std::collections::HashMap;
use anyhow::Error;
use serde_json::{json, Value};
use hyper::header;
use hyper::{Body, Response, StatusCode};
use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
pub(crate) struct ErrorMessageExtension(pub String);
/// Methods to format data and errors
pub trait OutputFormatter: Send + Sync {
/// Transform json data into a http response
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
/// Transform serializable data into a streaming http response
fn format_data_streaming(
&self,
data: Box<dyn SerializableReturn + Send>,
rpcenv: &dyn RpcEnvironment,
) -> Result<Response<Body>, Error>;
/// Transform errors into a http response
fn format_error(&self, err: Error) -> Response<Body>;
/// Transform a [Result] into a http response
fn format_result(
&self,
result: Result<Value, Error>,
rpcenv: &dyn RpcEnvironment,
) -> Response<Body> {
match result {
Ok(data) => self.format_data(data, rpcenv),
Err(err) => self.format_error(err),
}
}
}
static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8";
fn json_data_response(data: Value) -> Response<Body> {
let json_str = data.to_string();
let raw = json_str.into_bytes();
let mut response = Response::new(raw.into());
response.headers_mut().insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(JSON_CONTENT_TYPE),
);
response
}
fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
let response = Response::builder()
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(body)?;
Ok(response)
}
fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
let attributes = match rpcenv.result_attrib().as_object() {
Some(attr) => attr,
None => return,
};
for (key, value) in attributes {
result[key] = value.clone();
}
}
fn start_data_streaming(
value: Value,
data: Box<dyn SerializableReturn + Send>,
) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
let (writer, reader) = tokio::sync::mpsc::channel(1);
tokio::task::spawn_blocking(move || {
let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
let mut output = std::io::BufWriter::new(output);
let mut serializer = serde_json::Serializer::new(&mut output);
let _ = data.sender_serialize(&mut serializer, value);
});
reader
}
struct JsonFormatter();
/// Format data as ``application/json``
///
/// The returned json object contains the following properties:
///
/// * ``data``: The result data (on success)
///
/// Any result attributes set on ``rpcenv`` are also added to the object.
///
/// Errors generates a BAD_REQUEST containing the error
/// message as string.
pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter();
impl OutputFormatter for JsonFormatter {
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
let mut result = json!({ "data": data });
add_result_attributes(&mut result, rpcenv);
json_data_response(result)
}
fn format_data_streaming(
&self,
data: Box<dyn SerializableReturn + Send>,
rpcenv: &dyn RpcEnvironment,
) -> Result<Response<Body>, Error> {
let mut value = json!({});
add_result_attributes(&mut value, rpcenv);
let reader = start_data_streaming(value, data);
let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
json_data_response_streaming(Body::wrap_stream(stream))
}
fn format_error(&self, err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
*resp.status_mut() = apierr.code;
resp
} else {
let mut resp = Response::new(Body::from(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
resp
};
response.headers_mut().insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(JSON_CONTENT_TYPE),
);
response
.extensions_mut()
.insert(ErrorMessageExtension(err.to_string()));
response
}
}
/// Format data as ExtJS compatible ``application/json``
///
/// The returned json object contains the following properties:
///
/// * ``success``: boolean attribute indicating the success.
///
/// * ``data``: The result data (on success)
///
/// * ``message``: The error message (on failure)
///
/// * ``errors``: detailed list of errors (if available)
///
/// Any result attributes set on ``rpcenv`` are also added to the object.
///
/// Please note that errors return status code OK, but setting success
/// to false.
pub static EXTJS_FORMATTER: &'static dyn OutputFormatter = &ExtJsFormatter();
struct ExtJsFormatter();
impl OutputFormatter for ExtJsFormatter {
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
let mut result = json!({
"data": data,
"success": true
});
add_result_attributes(&mut result, rpcenv);
json_data_response(result)
}
fn format_data_streaming(
&self,
data: Box<dyn SerializableReturn + Send>,
rpcenv: &dyn RpcEnvironment,
) -> Result<Response<Body>, Error> {
let mut value = json!({
"success": true,
});
add_result_attributes(&mut value, rpcenv);
let reader = start_data_streaming(value, data);
let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
json_data_response_streaming(Body::wrap_stream(stream))
}
fn format_error(&self, err: Error) -> Response<Body> {
let mut errors = HashMap::new();
let message: String = match err.downcast::<ParameterError>() {
Ok(param_err) => {
for (name, err) in param_err {
errors.insert(name, err.to_string());
}
String::from("parameter verification errors")
}
Err(err) => err.to_string(),
};
let result = json!({
"message": message,
"errors": errors,
"success": false
});
let mut response = json_data_response(result);
response
.extensions_mut()
.insert(ErrorMessageExtension(message));
response
}
}

View File

@ -1,147 +0,0 @@
use anyhow::Error;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::*;
use hyper::{Body, Request, Response, StatusCode};
use proxmox_router::http_err;
use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
use crate::formatter::*;
use crate::{normalize_uri_path, WorkerTask};
/// Hyper Service implementation to handle stateful H2 connections.
///
/// We use this kind of service to handle backup protocol
/// connections. State is stored inside the generic ``rpcenv``. Logs
/// goes into the ``WorkerTask`` log.
pub struct H2Service<E> {
router: &'static Router,
rpcenv: E,
worker: Arc<WorkerTask>,
debug: bool,
}
impl<E: RpcEnvironment + Clone> H2Service<E> {
pub fn new(rpcenv: E, worker: Arc<WorkerTask>, router: &'static Router, debug: bool) -> Self {
Self {
rpcenv,
worker,
router,
debug,
}
}
pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug {
self.worker.log_message(msg);
}
}
fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
let (path, components) = match normalize_uri_path(parts.uri.path()) {
Ok((p, c)) => (p, c),
Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(),
};
self.debug(format!("{} {}", method, path));
let mut uri_param = HashMap::new();
let formatter = JSON_FORMATTER;
match self.router.find_method(&components, method, &mut uri_param) {
None => {
let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
future::ok(formatter.format_error(err)).boxed()
}
Some(api_method) => crate::rest::handle_api_request(
self.rpcenv.clone(),
api_method,
formatter,
parts,
body,
uri_param,
)
.boxed(),
}
}
fn log_response(
worker: Arc<WorkerTask>,
method: hyper::Method,
path: &str,
resp: &Response<Body>,
) {
let status = resp.status();
if !status.is_success() {
let reason = status.canonical_reason().unwrap_or("unknown reason");
let mut message = "request failed";
if let Some(data) = resp.extensions().get::<ErrorMessageExtension>() {
message = &data.0;
}
worker.log_message(format!(
"{} {}: {} {}: {}",
method.as_str(),
path,
status.as_str(),
reason,
message
));
}
}
}
impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let path = req.uri().path().to_owned();
let method = req.method().clone();
let worker = self.worker.clone();
self.handle_request(req)
.map(move |result| match result {
Ok(res) => {
Self::log_response(worker, method, &path, &res);
Ok::<_, Error>(res)
}
Err(err) => {
if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
resp.extensions_mut()
.insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
} else {
let mut resp = Response::new(Body::from(err.to_string()));
resp.extensions_mut()
.insert(ErrorMessageExtension(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
}
}
})
.boxed()
}
}

View File

@ -1,244 +0,0 @@
//! # Proxmox REST server
//!
//! This module provides convenient building blocks to implement a
//! REST server.
//!
//! ## Features
//!
//! * highly threaded code, uses Rust async
//! * static API definitions using schemas
//! * restartable systemd daemons using `systemd_notify`
//! * support for long running worker tasks (threads or async tokio tasks)
//! * supports separate access and authentication log files
//! * extra control socket to trigger management operations
//! - logfile rotation
//! - worker task management
//! * generic interface to authenticate user
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{bail, format_err, Error};
use http::request::Parts;
use http::HeaderMap;
use hyper::{Body, Method, Response};
use nix::unistd::Pid;
use proxmox_router::UserInformation;
use proxmox_sys::fd::Fd;
use proxmox_sys::fs::CreateOptions;
use proxmox_sys::linux::procfs::PidStat;
mod compression;
pub use compression::*;
pub mod daemon;
pub mod formatter;
mod environment;
pub use environment::*;
mod state;
pub use state::*;
mod command_socket;
pub use command_socket::*;
mod file_logger;
pub use file_logger::{FileLogOptions, FileLogger};
mod api_config;
pub use api_config::ApiConfig;
mod rest;
pub use rest::RestServer;
mod worker_task;
pub use worker_task::*;
mod h2service;
pub use h2service::*;
/// Authentication Error
pub enum AuthError {
Generic(Error),
NoData,
}
impl From<Error> for AuthError {
fn from(err: Error) -> Self {
AuthError::Generic(err)
}
}
/// Result of [`ServerAdapter::check_auth`].
pub type ServerAdapterCheckAuth<'a> = Pin<
Box<
dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+ Send
+ 'a,
>,
>;
/// User Authentication and index/root page generation methods
pub trait ServerAdapter: Send + Sync {
/// Returns the index/root page
fn get_index(
&self,
rest_env: RestEnvironment,
parts: Parts,
) -> Pin<Box<dyn Future<Output = Response<Body>> + Send>>;
/// Extract user credentials from headers and check them.
///
/// If credenthials are valid, returns the username and a
/// [UserInformation] object to query additional user data.
fn check_auth<'a>(
&'a self,
headers: &'a HeaderMap,
method: &'a Method,
) -> ServerAdapterCheckAuth<'a>;
}
lazy_static::lazy_static! {
static ref PID: i32 = unsafe { libc::getpid() };
static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
}
/// Returns the current process ID (see [libc::getpid])
///
/// The value is cached at startup (so it is invalid after a fork)
pub(crate) fn pid() -> i32 {
*PID
}
/// Returns the starttime of the process (see [PidStat])
///
/// The value is cached at startup (so it is invalid after a fork)
pub(crate) fn pstart() -> u64 {
*PSTART
}
/// Helper to write the PID into a file
pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
let pid_str = format!("{}\n", *PID);
proxmox_sys::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new(), false)
}
/// Helper to read the PID from a file
pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
let pid = proxmox_sys::fs::file_get_contents(pid_fn)?;
let pid = std::str::from_utf8(&pid)?.trim();
pid.parse()
.map_err(|err| format_err!("could not parse pid - {}", err))
}
/// Returns the control socket path for a specific process ID.
///
/// Note: The control socket always uses @/run/proxmox-backup/ as
/// prefix for historic reason. This does not matter because the
/// generated path is unique for each ``pid`` anyways.
pub fn ctrl_sock_from_pid(pid: i32) -> String {
// Note: The control socket always uses @/run/proxmox-backup/ as prefix
// for historc reason.
format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
}
/// Returns the control socket path for this server.
pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
/// Request a server shutdown (usually called from [catch_shutdown_signal])
pub fn request_shutdown() {
SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
crate::server_shutdown();
}
/// Returns true if there was a shutdown request.
#[inline(always)]
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
}
/// Raise an error if there was a shutdown request.
pub fn fail_on_shutdown() -> Result<(), Error> {
if shutdown_requested() {
bail!("Server shutdown requested - aborting task");
}
Ok(())
}
/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
/// descriptors.
pub fn socketpair() -> Result<(Fd, Fd), Error> {
use nix::sys::socket;
let (pa, pb) = socket::socketpair(
socket::AddressFamily::Unix,
socket::SockType::Stream,
None,
socket::SockFlag::SOCK_CLOEXEC,
)?;
Ok((Fd(pa), Fd(pb)))
}
/// Extract a specific cookie from cookie header.
/// We assume cookie_name is already url encoded.
pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {
for pair in cookie.split(';') {
let (name, value) = match pair.find('=') {
Some(i) => (pair[..i].trim(), pair[(i + 1)..].trim()),
None => return None, // Cookie format error
};
if name == cookie_name {
use percent_encoding::percent_decode;
if let Ok(value) = percent_decode(value.as_bytes()).decode_utf8() {
return Some(value.into());
} else {
return None; // Cookie format error
}
}
}
None
}
/// Extract a specific cookie from a HeaderMap's "COOKIE" entry.
/// We assume cookie_name is already url encoded.
pub fn cookie_from_header(headers: &http::HeaderMap, cookie_name: &str) -> Option<String> {
if let Some(Ok(cookie)) = headers.get("COOKIE").map(|v| v.to_str()) {
extract_cookie(cookie, cookie_name)
} else {
None
}
}
/// normalize uri path
///
/// Do not allow ".", "..", or hidden files ".XXXX"
/// Also remove empty path components
pub fn normalize_uri_path(path: &str) -> Result<(String, Vec<&str>), Error> {
let items = path.split('/');
let mut path = String::new();
let mut components = vec![];
for name in items {
if name.is_empty() {
continue;
}
if name.starts_with('.') {
bail!("Path contains illegal components.");
}
path.push('/');
path.push_str(name);
components.push(name);
}
Ok((path, components))
}

View File

@ -1,789 +0,0 @@
use std::collections::HashMap;
use std::future::Future;
use std::hash::BuildHasher;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::future::{self, FutureExt, TryFutureExt};
use futures::stream::TryStreamExt;
use hyper::body::HttpBody;
use hyper::header::{self, HeaderMap};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
use lazy_static::lazy_static;
use regex::Regex;
use serde_json::Value;
use tokio::fs::File;
use tokio::time::Instant;
use tower_service::Service;
use url::form_urlencoded;
use proxmox_router::http_err;
use proxmox_router::{
check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
RpcEnvironmentType, UserInformation,
};
use proxmox_schema::{ObjectSchemaType, ParameterSchema};
use proxmox_http::client::RateLimitedStream;
use proxmox_async::stream::AsyncReaderStream;
use proxmox_compression::{DeflateEncoder, Level};
use crate::{
formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger,
RestEnvironment,
};
extern "C" {
fn tzset();
}
struct AuthStringExtension(String);
struct EmptyUserInformation {}
impl UserInformation for EmptyUserInformation {
fn is_superuser(&self, _userid: &str) -> bool {
false
}
fn is_group_member(&self, _userid: &str, _group: &str) -> bool {
false
}
fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 {
0
}
}
/// REST server implementation (configured with [ApiConfig])
///
/// This struct implements the [Service] trait in order to use it with
/// [hyper::server::Builder::serve].
pub struct RestServer {
api_config: Arc<ApiConfig>,
}
const MAX_URI_QUERY_LENGTH: usize = 3072;
const CHUNK_SIZE_LIMIT: u64 = 32 * 1024;
impl RestServer {
/// Creates a new instance.
pub fn new(api_config: ApiConfig) -> Self {
Self {
api_config: Arc::new(api_config),
}
}
}
impl Service<&Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>>
for RestServer
{
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(
&mut self,
ctx: &Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>,
) -> Self::Future {
match ctx.get_ref().peer_addr() {
Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
Ok(peer) => future::ok(ApiService {
peer,
api_config: self.api_config.clone(),
})
.boxed(),
}
}
}
impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>> for RestServer {
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(
&mut self,
ctx: &Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
) -> Self::Future {
match ctx.get_ref().peer_addr() {
Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
Ok(peer) => future::ok(ApiService {
peer,
api_config: self.api_config.clone(),
})
.boxed(),
}
}
}
impl Service<&hyper::server::conn::AddrStream> for RestServer {
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future {
let peer = ctx.remote_addr();
future::ok(ApiService {
peer,
api_config: self.api_config.clone(),
})
.boxed()
}
}
impl Service<&tokio::net::UnixStream> for RestServer {
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _ctx: &tokio::net::UnixStream) -> Self::Future {
// TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
// it doesn't really matter, so just use a fake IP address
let fake_peer = "0.0.0.0:807".parse().unwrap();
future::ok(ApiService {
peer: fake_peer,
api_config: self.api_config.clone(),
})
.boxed()
}
}
// Helper [Service] containing the peer Address
//
// The lower level connection [Service] implementation on
// [RestServer] extracts the peer address and return an [ApiService].
//
// Rust wants this type 'pub' here (else we get 'private type `ApiService`
// in public interface'). The type is still private because the crate does
// not export it.
pub struct ApiService {
pub peer: std::net::SocketAddr,
pub api_config: Arc<ApiConfig>,
}
fn log_response(
logfile: Option<&Arc<Mutex<FileLogger>>>,
peer: &std::net::SocketAddr,
method: hyper::Method,
path_query: &str,
resp: &Response<Body>,
user_agent: Option<String>,
) {
if resp.extensions().get::<NoLogExtension>().is_some() {
return;
};
// we also log URL-to-long requests, so avoid message bigger than PIPE_BUF (4k on Linux)
// to profit from atomicty guarantees for O_APPEND opened logfiles
let path = &path_query[..MAX_URI_QUERY_LENGTH.min(path_query.len())];
let status = resp.status();
if !(status.is_success() || status.is_informational()) {
let reason = status.canonical_reason().unwrap_or("unknown reason");
let message = match resp.extensions().get::<ErrorMessageExtension>() {
Some(data) => &data.0,
None => "request failed",
};
log::error!(
"{} {}: {} {}: [client {}] {}",
method.as_str(),
path,
status.as_str(),
reason,
peer,
message
);
}
if let Some(logfile) = logfile {
let auth_id = match resp.extensions().get::<AuthStringExtension>() {
Some(AuthStringExtension(auth_id)) => auth_id.clone(),
None => "-".to_string(),
};
let now = proxmox_time::epoch_i64();
// time format which apache/nginx use (by default), copied from pve-http-server
let datetime = proxmox_time::strftime_local("%d/%m/%Y:%H:%M:%S %z", now)
.unwrap_or_else(|_| "-".to_string());
logfile.lock().unwrap().log(format!(
"{} - {} [{}] \"{} {}\" {} {} {}",
peer.ip(),
auth_id,
datetime,
method.as_str(),
path,
status.as_str(),
resp.body().size_hint().lower(),
user_agent.unwrap_or_else(|| "-".to_string()),
));
}
}
fn get_proxied_peer(headers: &HeaderMap) -> Option<std::net::SocketAddr> {
lazy_static! {
static ref RE: Regex = Regex::new(r#"for="([^"]+)""#).unwrap();
}
let forwarded = headers.get(header::FORWARDED)?.to_str().ok()?;
let capture = RE.captures(forwarded)?;
let rhost = capture.get(1)?.as_str();
rhost.parse().ok()
}
fn get_user_agent(headers: &HeaderMap) -> Option<String> {
let agent = headers.get(header::USER_AGENT)?.to_str();
agent
.map(|s| {
let mut s = s.to_owned();
s.truncate(128);
s
})
.ok()
}
impl Service<Request<Body>> for ApiService {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let path = req.uri().path_and_query().unwrap().as_str().to_owned();
let method = req.method().clone();
let user_agent = get_user_agent(req.headers());
let config = Arc::clone(&self.api_config);
let peer = match get_proxied_peer(req.headers()) {
Some(proxied_peer) => proxied_peer,
None => self.peer,
};
async move {
let response = match handle_request(Arc::clone(&config), req, &peer).await {
Ok(response) => response,
Err(err) => {
let (err, code) = match err.downcast_ref::<HttpError>() {
Some(apierr) => (apierr.message.clone(), apierr.code),
_ => (err.to_string(), StatusCode::BAD_REQUEST),
};
Response::builder()
.status(code)
.extension(ErrorMessageExtension(err.to_string()))
.body(err.into())?
}
};
let logger = config.get_access_log();
log_response(logger, &peer, method, &path, &response, user_agent);
Ok(response)
}
.boxed()
}
}
fn parse_query_parameters<S: 'static + BuildHasher + Send>(
param_schema: ParameterSchema,
form: &str, // x-www-form-urlencoded body data
parts: &Parts,
uri_param: &HashMap<String, String, S>,
) -> Result<Value, Error> {
let mut param_list: Vec<(String, String)> = vec![];
if !form.is_empty() {
for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() {
param_list.push((k, v));
}
}
if let Some(query_str) = parts.uri.query() {
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
if k == "_dc" {
continue;
} // skip extjs "disable cache" parameter
param_list.push((k, v));
}
}
for (k, v) in uri_param {
param_list.push((k.clone(), v.clone()));
}
let params = param_schema.parse_parameter_strings(&param_list, true)?;
Ok(params)
}
async fn get_request_parameters<S: 'static + BuildHasher + Send>(
param_schema: ParameterSchema,
parts: Parts,
req_body: Body,
uri_param: HashMap<String, String, S>,
) -> Result<Value, Error> {
let mut is_json = false;
if let Some(value) = parts.headers.get(header::CONTENT_TYPE) {
match value.to_str().map(|v| v.split(';').next()) {
Ok(Some("application/x-www-form-urlencoded")) => {
is_json = false;
}
Ok(Some("application/json")) => {
is_json = true;
}
_ => bail!("unsupported content type {:?}", value.to_str()),
}
}
let body = TryStreamExt::map_err(req_body, |err| {
http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
})
.try_fold(Vec::new(), |mut acc, chunk| async move {
// FIXME: max request body size?
if acc.len() + chunk.len() < 64 * 1024 {
acc.extend_from_slice(&chunk);
Ok(acc)
} else {
Err(http_err!(BAD_REQUEST, "Request body too large"))
}
})
.await?;
let utf8_data =
std::str::from_utf8(&body).map_err(|err| format_err!("Request body not uft8: {}", err))?;
if is_json {
let mut params: Value = serde_json::from_str(utf8_data)?;
for (k, v) in uri_param {
if let Some((_optional, prop_schema)) = param_schema.lookup(&k) {
params[&k] = prop_schema.parse_simple_value(&v)?;
}
}
param_schema.verify_json(&params)?;
Ok(params)
} else {
parse_query_parameters(param_schema, utf8_data, &parts, &uri_param)
}
}
struct NoLogExtension();
async fn proxy_protected_request(
info: &'static ApiMethod,
mut parts: Parts,
req_body: Body,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let mut uri_parts = parts.uri.clone().into_parts();
uri_parts.scheme = Some(http::uri::Scheme::HTTP);
uri_parts.authority = Some(http::uri::Authority::from_static("127.0.0.1:82"));
let new_uri = http::Uri::from_parts(uri_parts).unwrap();
parts.uri = new_uri;
let mut request = Request::from_parts(parts, req_body);
request.headers_mut().insert(
header::FORWARDED,
format!("for=\"{}\";", peer).parse().unwrap(),
);
let reload_timezone = info.reload_timezone;
let resp = hyper::client::Client::new()
.request(request)
.map_err(Error::from)
.map_ok(|mut resp| {
resp.extensions_mut().insert(NoLogExtension());
resp
})
.await?;
if reload_timezone {
unsafe {
tzset();
}
}
Ok(resp)
}
pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
mut rpcenv: Env,
info: &'static ApiMethod,
formatter: &'static dyn OutputFormatter,
parts: Parts,
req_body: Body,
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
let compression = extract_compression_method(&parts.headers);
let result = match info.handler {
ApiHandler::AsyncHttp(handler) => {
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
ApiHandler::StreamingSync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv)
.and_then(|data| formatter.format_data_streaming(data, &rpcenv))
}
ApiHandler::StreamingAsync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv)
.await
.and_then(|data| formatter.format_data_streaming(data, &rpcenv))
}
ApiHandler::Sync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv).map(|data| formatter.format_data(data, &rpcenv))
}
ApiHandler::Async(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv)
.await
.map(|data| formatter.format_data(data, &rpcenv))
}
_ => {
bail!("Unknown API handler type");
}
};
let mut resp = match result {
Ok(resp) => resp,
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
if httperr.code == StatusCode::UNAUTHORIZED {
tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
}
}
formatter.format_error(err)
}
};
let resp = match compression {
Some(CompressionMethod::Deflate) => {
resp.headers_mut().insert(
header::CONTENT_ENCODING,
CompressionMethod::Deflate.content_encoding(),
);
resp.map(|body| {
Body::wrap_stream(DeflateEncoder::with_quality(
TryStreamExt::map_err(body, |err| {
proxmox_lang::io_format_err!("error during compression: {}", err)
}),
Level::Default,
))
})
}
None => resp,
};
if info.reload_timezone {
unsafe {
tzset();
}
}
Ok(resp)
}
fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
return match ext {
"css" => ("text/css", false),
"html" => ("text/html", false),
"js" => ("application/javascript", false),
"json" => ("application/json", false),
"map" => ("application/json", false),
"png" => ("image/png", true),
"ico" => ("image/x-icon", true),
"gif" => ("image/gif", true),
"svg" => ("image/svg+xml", false),
"jar" => ("application/java-archive", true),
"woff" => ("application/font-woff", true),
"woff2" => ("application/font-woff2", true),
"ttf" => ("application/font-snft", true),
"pdf" => ("application/pdf", true),
"epub" => ("application/epub+zip", true),
"mp3" => ("audio/mpeg", true),
"oga" => ("audio/ogg", true),
"tgz" => ("application/x-compressed-tar", true),
_ => ("application/octet-stream", false),
};
}
("application/octet-stream", false)
}
async fn simple_static_file_download(
filename: PathBuf,
content_type: &'static str,
compression: Option<CompressionMethod>,
) -> Result<Response<Body>, Error> {
use tokio::io::AsyncReadExt;
let mut file = File::open(filename)
.await
.map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
let mut data: Vec<u8> = Vec::new();
let mut response = match compression {
Some(CompressionMethod::Deflate) => {
let mut enc = DeflateEncoder::with_quality(data, Level::Default);
enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize)
.await?;
let mut response = Response::new(enc.into_inner().into());
response.headers_mut().insert(
header::CONTENT_ENCODING,
CompressionMethod::Deflate.content_encoding(),
);
response
}
None => {
file.read_to_end(&mut data)
.await
.map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
Response::new(data.into())
}
};
response.headers_mut().insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(content_type),
);
Ok(response)
}
async fn chuncked_static_file_download(
filename: PathBuf,
content_type: &'static str,
compression: Option<CompressionMethod>,
) -> Result<Response<Body>, Error> {
let mut resp = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type);
let file = File::open(filename)
.await
.map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
let body = match compression {
Some(CompressionMethod::Deflate) => {
resp = resp.header(
header::CONTENT_ENCODING,
CompressionMethod::Deflate.content_encoding(),
);
Body::wrap_stream(DeflateEncoder::with_quality(
AsyncReaderStream::new(file),
Level::Default,
))
}
None => Body::wrap_stream(AsyncReaderStream::new(file)),
};
Ok(resp.body(body).unwrap())
}
async fn handle_static_file_download(
filename: PathBuf,
compression: Option<CompressionMethod>,
) -> Result<Response<Body>, Error> {
let metadata = tokio::fs::metadata(filename.clone())
.map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
.await?;
let (content_type, nocomp) = extension_to_content_type(&filename);
let compression = if nocomp { None } else { compression };
if metadata.len() < CHUNK_SIZE_LIMIT {
simple_static_file_download(filename, content_type, compression).await
} else {
chuncked_static_file_download(filename, content_type, compression).await
}
}
// FIXME: support handling multiple compression methods
fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) {
for encoding in encodings.split(&[',', ' '][..]) {
if let Ok(method) = encoding.parse() {
return Some(method);
}
}
}
None
}
async fn handle_request(
api: Arc<ApiConfig>,
req: Request<Body>,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
let (path, components) = normalize_uri_path(parts.uri.path())?;
let comp_len = components.len();
let query = parts.uri.query().unwrap_or_default();
if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
return Ok(Response::builder()
.status(StatusCode::URI_TOO_LONG)
.body("".into())
.unwrap());
}
let env_type = api.env_type();
let mut rpcenv = RestEnvironment::new(env_type, Arc::clone(&api));
rpcenv.set_client_ip(Some(*peer));
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
let access_forbidden_time = std::time::Instant::now() + std::time::Duration::from_millis(500);
if comp_len >= 1 && components[0] == "api2" {
if comp_len >= 2 {
let format = components[1];
let formatter: &dyn OutputFormatter = match format {
"json" => JSON_FORMATTER,
"extjs" => EXTJS_FORMATTER,
_ => bail!("Unsupported output format '{}'.", format),
};
let mut uri_param = HashMap::new();
let api_method = api.find_method(&components[2..], method.clone(), &mut uri_param);
let mut auth_required = true;
if let Some(api_method) = api_method {
if let Permission::World = *api_method.access.permission {
auth_required = false; // no auth for endpoints with World permission
}
}
let mut user_info: Box<dyn UserInformation + Send + Sync> =
Box::new(EmptyUserInformation {});
if auth_required {
match api.check_auth(&parts.headers, &method).await {
Ok((authid, info)) => {
rpcenv.set_auth_id(Some(authid));
user_info = info;
}
Err(auth_err) => {
let err = match auth_err {
AuthError::Generic(err) => err,
AuthError::NoData => {
format_err!("no authentication credentials provided.")
}
};
// fixme: log Username??
rpcenv.log_failed_auth(None, &err.to_string());
// always delay unauthorized calls by 3 seconds (from start of request)
let err = http_err!(UNAUTHORIZED, "authentication failed - {}", err);
tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
return Ok(formatter.format_error(err));
}
}
}
match api_method {
None => {
let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
return Ok(formatter.format_error(err));
}
Some(api_method) => {
let auth_id = rpcenv.get_auth_id();
let user_info = user_info;
if !check_api_permission(
api_method.access.permission,
auth_id.as_deref(),
&uri_param,
user_info.as_ref(),
) {
let err = http_err!(FORBIDDEN, "permission check failed");
tokio::time::sleep_until(Instant::from_std(access_forbidden_time)).await;
return Ok(formatter.format_error(err));
}
let result = if api_method.protected && env_type == RpcEnvironmentType::PUBLIC {
proxy_protected_request(api_method, parts, body, peer).await
} else {
handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param)
.await
};
let mut response = match result {
Ok(resp) => resp,
Err(err) => formatter.format_error(err),
};
if let Some(auth_id) = auth_id {
response
.extensions_mut()
.insert(AuthStringExtension(auth_id));
}
return Ok(response);
}
}
}
} else {
// not Auth required for accessing files!
if method != hyper::Method::GET {
bail!("Unsupported HTTP method {}", method);
}
if comp_len == 0 {
match api.check_auth(&parts.headers, &method).await {
Ok((auth_id, _user_info)) => {
rpcenv.set_auth_id(Some(auth_id));
return Ok(api.get_index(rpcenv, parts).await);
}
Err(AuthError::Generic(_)) => {
tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
}
Err(AuthError::NoData) => {}
}
return Ok(api.get_index(rpcenv, parts).await);
} else {
let filename = api.find_alias(&components);
let compression = extract_compression_method(&parts.headers);
return handle_static_file_download(filename, compression).await;
}
}
Err(http_err!(NOT_FOUND, "Path '{}' not found.", path))
}

View File

@ -1,161 +0,0 @@
use anyhow::Error;
use lazy_static::lazy_static;
use std::sync::Mutex;
use futures::*;
use tokio::signal::unix::{signal, SignalKind};
use proxmox_async::broadcast_future::BroadcastData;
use crate::request_shutdown;
#[derive(PartialEq, Copy, Clone, Debug)]
enum ServerMode {
Normal,
Shutdown,
}
struct ServerState {
mode: ServerMode,
shutdown_listeners: BroadcastData<()>,
last_worker_listeners: BroadcastData<()>,
worker_count: usize,
internal_task_count: usize,
reload_request: bool,
}
lazy_static! {
static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
mode: ServerMode::Normal,
shutdown_listeners: BroadcastData::new(),
last_worker_listeners: BroadcastData::new(),
worker_count: 0,
internal_task_count: 0,
reload_request: false,
});
}
/// Listen to ``SIGINT`` for server shutdown
///
/// This calls [request_shutdown] when receiving the signal.
pub fn catch_shutdown_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::interrupt())?;
let future = async move {
while stream.recv().await.is_some() {
log::info!("got shutdown request (SIGINT)");
SERVER_STATE.lock().unwrap().reload_request = false;
request_shutdown();
}
}
.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
Ok(())
}
/// Listen to ``SIGHUP`` for server reload
///
/// This calls [request_shutdown] when receiving the signal, and tries
/// to restart the server.
pub fn catch_reload_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::hangup())?;
let future = async move {
while stream.recv().await.is_some() {
log::info!("got reload request (SIGHUP)");
SERVER_STATE.lock().unwrap().reload_request = true;
crate::request_shutdown();
}
}
.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
Ok(())
}
pub(crate) fn is_reload_request() -> bool {
let data = SERVER_STATE.lock().unwrap();
data.mode == ServerMode::Shutdown && data.reload_request
}
pub(crate) fn server_shutdown() {
let mut data = SERVER_STATE.lock().unwrap();
log::info!("request_shutdown");
data.mode = ServerMode::Shutdown;
data.shutdown_listeners.notify_listeners(Ok(()));
drop(data); // unlock
check_last_worker();
}
/// Future to signal server shutdown
pub fn shutdown_future() -> impl Future<Output = ()> {
let mut data = SERVER_STATE.lock().unwrap();
data.shutdown_listeners.listen().map(|_| ())
}
/// Future to signal when last worker task finished
pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
let mut data = SERVER_STATE.lock().unwrap();
data.last_worker_listeners.listen()
}
pub(crate) fn set_worker_count(count: usize) {
SERVER_STATE.lock().unwrap().worker_count = count;
check_last_worker();
}
pub(crate) fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap();
if !(data.mode == ServerMode::Shutdown
&& data.worker_count == 0
&& data.internal_task_count == 0)
{
return;
}
data.last_worker_listeners.notify_listeners(Ok(()));
}
/// Spawns a tokio task that will be tracked for reload
/// and if it is finished, notify the [last_worker_future] if we
/// are in shutdown mode.
pub fn spawn_internal_task<T>(task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let mut data = SERVER_STATE.lock().unwrap();
data.internal_task_count += 1;
tokio::spawn(async move {
let _ = tokio::spawn(task).await; // ignore errors
{
// drop mutex
let mut data = SERVER_STATE.lock().unwrap();
if data.internal_task_count > 0 {
data.internal_task_count -= 1;
}
}
check_last_worker();
});
}

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,7 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] }
proxmox-async = "0.4"
proxmox-compression = "0.1.1"
proxmox-rest-server = "0.2"
proxmox-router = { version = "1.3.0", features = [ "cli", "server" ] }
proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] }
proxmox-time = "1"
@ -36,5 +37,4 @@ proxmox-sys = { version = "0.4", features = [ "sortable-macro" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-tools = { path = "../pbs-tools" }
pbs-datastore = { path = "../pbs-datastore" }
proxmox-rest-server = { path = "../proxmox-rest-server" }
pbs-client = { path = "../pbs-client" }