mirror of
https://git.proxmox.com/git/proxmox
synced 2025-08-12 01:52:00 +00:00
proxmox-router: add new ApiHandler variants for streaming serialization
they should behave like their normal variants, but return a `Box<dyn SerializableReturn + Send>` instead of a value. This is useful since we do not have to generate the `Value` in-memory, but can stream the serialization to the client. We cannot simply use a `Box<dyn serde::Serialize>`, because that trait is not object-safe and thus cannot be used as a trait-object. Signed-off-by: Dominik Csapak <d.csapak@proxmox.com> Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
parent
2c9272945e
commit
f585722aad
@ -72,6 +72,18 @@ async fn handle_simple_command_future(
|
|||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
|
||||||
|
Ok(value) => {
|
||||||
|
let value = value.to_value()?;
|
||||||
|
if value != Value::Null {
|
||||||
|
println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Error: {}", err);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
},
|
||||||
ApiHandler::Async(handler) => {
|
ApiHandler::Async(handler) => {
|
||||||
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
|
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
|
||||||
|
|
||||||
@ -87,6 +99,22 @@ async fn handle_simple_command_future(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ApiHandler::StreamingAsync(handler) => {
|
||||||
|
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
|
||||||
|
|
||||||
|
match future.await {
|
||||||
|
Ok(value) => {
|
||||||
|
let value = value.to_value()?;
|
||||||
|
if value != Value::Null {
|
||||||
|
println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Error: {}", err);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ApiHandler::AsyncHttp(_) => {
|
ApiHandler::AsyncHttp(_) => {
|
||||||
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
|
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
|
||||||
print_simple_usage_error(prefix, cli_cmd, err_msg);
|
print_simple_usage_error(prefix, cli_cmd, err_msg);
|
||||||
@ -118,6 +146,18 @@ fn handle_simple_command(
|
|||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
|
||||||
|
Ok(value) => {
|
||||||
|
let value = value.to_value()?;
|
||||||
|
if value != Value::Null {
|
||||||
|
println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Error: {}", err);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
},
|
||||||
ApiHandler::Async(handler) => {
|
ApiHandler::Async(handler) => {
|
||||||
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
|
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
|
||||||
if let Some(run) = run {
|
if let Some(run) = run {
|
||||||
@ -138,6 +178,11 @@ fn handle_simple_command(
|
|||||||
return Err(format_err!("{}", err_msg));
|
return Err(format_err!("{}", err_msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ApiHandler::StreamingAsync(_handler) => {
|
||||||
|
let err_msg = "CliHandler does not support ApiHandler::StreamingAsync - internal error";
|
||||||
|
print_simple_usage_error(prefix, cli_cmd, err_msg);
|
||||||
|
return Err(format_err!("{}", err_msg));
|
||||||
|
}
|
||||||
ApiHandler::AsyncHttp(_) => {
|
ApiHandler::AsyncHttp(_) => {
|
||||||
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
|
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
|
||||||
print_simple_usage_error(prefix, cli_cmd, err_msg);
|
print_simple_usage_error(prefix, cli_cmd, err_msg);
|
||||||
|
@ -14,6 +14,7 @@ use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema};
|
|||||||
|
|
||||||
use super::Permission;
|
use super::Permission;
|
||||||
use crate::RpcEnvironment;
|
use crate::RpcEnvironment;
|
||||||
|
use crate::SerializableReturn;
|
||||||
|
|
||||||
/// A synchronous API handler gets a json Value as input and returns a json Value as output.
|
/// A synchronous API handler gets a json Value as input and returns a json Value as output.
|
||||||
///
|
///
|
||||||
@ -42,6 +43,37 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm
|
|||||||
+ Sync
|
+ Sync
|
||||||
+ 'static);
|
+ 'static);
|
||||||
|
|
||||||
|
/// A synchronous API handler gets a json Value as input and returns a serializable return value as output.
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # use anyhow::Error;
|
||||||
|
/// # use serde_json::{json, Value};
|
||||||
|
/// use proxmox_router::{ApiHandler, ApiMethod, RpcEnvironment, SerializableReturn};
|
||||||
|
/// use proxmox_schema::ObjectSchema;
|
||||||
|
///
|
||||||
|
/// fn hello(
|
||||||
|
/// param: Value,
|
||||||
|
/// info: &ApiMethod,
|
||||||
|
/// rpcenv: &mut dyn RpcEnvironment,
|
||||||
|
/// ) -> Result<Box<dyn SerializableReturn + Send>, Error> {
|
||||||
|
/// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
|
||||||
|
/// Ok(res)
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
|
||||||
|
/// &ApiHandler::StreamingSync(&hello),
|
||||||
|
/// &ObjectSchema::new("Hello World Example", &[])
|
||||||
|
/// );
|
||||||
|
/// ```
|
||||||
|
pub type StreamingApiHandlerFn = &'static (dyn Fn(
|
||||||
|
Value,
|
||||||
|
&ApiMethod,
|
||||||
|
&mut dyn RpcEnvironment,
|
||||||
|
) -> Result<Box<dyn SerializableReturn + Send>, Error>
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static);
|
||||||
|
|
||||||
/// Asynchronous API handlers
|
/// Asynchronous API handlers
|
||||||
///
|
///
|
||||||
/// Returns a future Value.
|
/// Returns a future Value.
|
||||||
@ -74,6 +106,44 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod,
|
|||||||
|
|
||||||
pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
|
pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
|
||||||
|
|
||||||
|
/// Streaming asynchronous API handlers
|
||||||
|
///
|
||||||
|
/// Returns a future Value.
|
||||||
|
/// ```
|
||||||
|
/// # use serde_json::{json, Value};
|
||||||
|
/// #
|
||||||
|
/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn};
|
||||||
|
/// use proxmox_schema::ObjectSchema;
|
||||||
|
///
|
||||||
|
///
|
||||||
|
/// fn hello_future<'a>(
|
||||||
|
/// param: Value,
|
||||||
|
/// info: &ApiMethod,
|
||||||
|
/// rpcenv: &'a mut dyn RpcEnvironment,
|
||||||
|
/// ) -> StreamingApiFuture<'a> {
|
||||||
|
/// Box::pin(async move {
|
||||||
|
/// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
|
||||||
|
/// Ok(res)
|
||||||
|
/// })
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
|
||||||
|
/// &ApiHandler::StreamingAsync(&hello_future),
|
||||||
|
/// &ObjectSchema::new("Hello World Example (async)", &[])
|
||||||
|
/// );
|
||||||
|
/// ```
|
||||||
|
pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
|
||||||
|
Value,
|
||||||
|
&'static ApiMethod,
|
||||||
|
&'a mut dyn RpcEnvironment,
|
||||||
|
) -> StreamingApiFuture<'a>
|
||||||
|
+ Send
|
||||||
|
+ Sync);
|
||||||
|
|
||||||
|
pub type StreamingApiFuture<'a> = Pin<
|
||||||
|
Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
|
||||||
|
>;
|
||||||
|
|
||||||
/// Asynchronous HTTP API handlers
|
/// Asynchronous HTTP API handlers
|
||||||
///
|
///
|
||||||
/// They get low level access to request and response data. Use this
|
/// They get low level access to request and response data. Use this
|
||||||
@ -124,7 +194,9 @@ pub type ApiResponseFuture =
|
|||||||
/// Enum for different types of API handler functions.
|
/// Enum for different types of API handler functions.
|
||||||
pub enum ApiHandler {
|
pub enum ApiHandler {
|
||||||
Sync(ApiHandlerFn),
|
Sync(ApiHandlerFn),
|
||||||
|
StreamingSync(StreamingApiHandlerFn),
|
||||||
Async(ApiAsyncHandlerFn),
|
Async(ApiAsyncHandlerFn),
|
||||||
|
StreamingAsync(StreamingApiAsyncHandlerFn),
|
||||||
AsyncHttp(ApiAsyncHttpHandlerFn),
|
AsyncHttp(ApiAsyncHttpHandlerFn),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,9 +211,15 @@ impl PartialEq for ApiHandler {
|
|||||||
(ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
|
(ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
|
||||||
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
||||||
}
|
}
|
||||||
|
(ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => {
|
||||||
|
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
||||||
|
}
|
||||||
(ApiHandler::Async(l), ApiHandler::Async(r)) => {
|
(ApiHandler::Async(l), ApiHandler::Async(r)) => {
|
||||||
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
||||||
}
|
}
|
||||||
|
(ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => {
|
||||||
|
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
||||||
|
}
|
||||||
(ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
|
(ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
|
||||||
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user