router, rest-server, api-macro: rename Streaming api to Serializing

This does not "stream", but rather skips the intermediate step to
serialize the entire output into a local json string.

We now reserve the "Stream*" prefix for actual *streaming*, that is,
producing an API response which gets streamed continuously as it is
asynchronously produced.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-08-23 11:09:04 +02:00
parent e8b8060b17
commit c31eaf0018
5 changed files with 47 additions and 40 deletions

View File

@ -169,8 +169,15 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
.transpose()? .transpose()?
.unwrap_or(false); .unwrap_or(false);
let streaming: bool = attribs if let Some(streaming) = attribs.remove("streaming") {
.remove("streaming") error!(
streaming.span(),
"streaming attribute was renamed to 'serializing', as it did not actually stream"
);
}
let serializing: bool = attribs
.remove("serializing")
.map(TryFrom::try_from) .map(TryFrom::try_from)
.transpose()? .transpose()?
.unwrap_or(false); .unwrap_or(false);
@ -201,7 +208,7 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
&mut func, &mut func,
&mut wrapper_ts, &mut wrapper_ts,
&mut default_consts, &mut default_consts,
streaming, serializing,
)?; )?;
// input schema is done, let's give the method body a chance to extract default parameters: // input schema is done, let's give the method body a chance to extract default parameters:
@ -224,9 +231,9 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
returns_schema_setter = quote! { .returns(#inner) }; returns_schema_setter = quote! { .returns(#inner) };
} }
let api_handler = match (streaming, is_async) { let api_handler = match (serializing, is_async) {
(true, true) => quote! { ::proxmox_router::ApiHandler::StreamingAsync(&#api_func_name) }, (true, true) => quote! { ::proxmox_router::ApiHandler::SerializingAsync(&#api_func_name) },
(true, false) => quote! { ::proxmox_router::ApiHandler::StreamingSync(&#api_func_name) }, (true, false) => quote! { ::proxmox_router::ApiHandler::SerializingSync(&#api_func_name) },
(false, true) => quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) }, (false, true) => quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) },
(false, false) => quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) }, (false, false) => quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) },
}; };
@ -287,7 +294,7 @@ fn handle_function_signature(
func: &mut syn::ItemFn, func: &mut syn::ItemFn,
wrapper_ts: &mut TokenStream, wrapper_ts: &mut TokenStream,
default_consts: &mut TokenStream, default_consts: &mut TokenStream,
streaming: bool, serializing: bool,
) -> Result<Ident, Error> { ) -> Result<Ident, Error> {
let sig = &func.sig; let sig = &func.sig;
let is_async = sig.asyncness.is_some(); let is_async = sig.asyncness.is_some();
@ -423,7 +430,7 @@ fn handle_function_signature(
wrapper_ts, wrapper_ts,
default_consts, default_consts,
is_async, is_async,
streaming, serializing,
) )
} }
@ -481,7 +488,7 @@ fn create_wrapper_function(
wrapper_ts: &mut TokenStream, wrapper_ts: &mut TokenStream,
default_consts: &mut TokenStream, default_consts: &mut TokenStream,
is_async: bool, is_async: bool,
streaming: bool, serializing: bool,
) -> Result<Ident, Error> { ) -> Result<Ident, Error> {
let api_func_name = Ident::new( let api_func_name = Ident::new(
&format!("api_function_{}", &func.sig.ident), &format!("api_function_{}", &func.sig.ident),
@ -523,7 +530,7 @@ fn create_wrapper_function(
_ => Some(quote!(?)), _ => Some(quote!(?)),
}; };
let body = if streaming { let body = if serializing {
quote! { quote! {
if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params { if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
#body #body
@ -545,14 +552,14 @@ fn create_wrapper_function(
} }
}; };
match (streaming, is_async) { match (serializing, is_async) {
(true, true) => { (true, true) => {
wrapper_ts.extend(quote! { wrapper_ts.extend(quote! {
fn #api_func_name<'a>( fn #api_func_name<'a>(
mut input_params: ::serde_json::Value, mut input_params: ::serde_json::Value,
api_method_param: &'static ::proxmox_router::ApiMethod, api_method_param: &'static ::proxmox_router::ApiMethod,
rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment, rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
) -> ::proxmox_router::StreamingApiFuture<'a> { ) -> ::proxmox_router::SerializingApiFuture<'a> {
::std::boxed::Box::pin(async move { #body }) ::std::boxed::Box::pin(async move { #body })
} }
}); });

View File

@ -236,18 +236,18 @@ pub fn basic_function() -> Result<(), Error> {
} }
#[api( #[api(
streaming: true, serializing: true,
)] )]
/// streaming async call /// serializing async call
pub async fn streaming_async_call() -> Result<(), Error> { pub async fn serializing_async_call() -> Result<(), Error> {
Ok(()) Ok(())
} }
#[api( #[api(
streaming: true, serializing: true,
)] )]
/// streaming sync call /// serializing sync call
pub fn streaming_sync_call() -> Result<(), Error> { pub fn serializing_sync_call() -> Result<(), Error> {
Ok(()) Ok(())
} }

View File

@ -505,13 +505,13 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await (handler)(parts, req_body, params, info, Box::new(rpcenv)).await
} }
ApiHandler::StreamingSync(handler) => { ApiHandler::SerializingSync(handler) => {
let params = let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?; get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv) (handler)(params, info, &mut rpcenv)
.and_then(|data| formatter.format_data_streaming(data, &rpcenv)) .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
} }
ApiHandler::StreamingAsync(handler) => { ApiHandler::SerializingAsync(handler) => {
let params = let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?; get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv) (handler)(params, info, &mut rpcenv)
@ -617,11 +617,11 @@ async fn handle_unformatted_api_request<Env: RpcEnvironment, S: 'static + BuildH
.await .await
.and_then(|v| to_json_response(v, &rpcenv)) .and_then(|v| to_json_response(v, &rpcenv))
} }
ApiHandler::StreamingSync(_) => http_bail!( ApiHandler::SerializingSync(_) => http_bail!(
INTERNAL_SERVER_ERROR, INTERNAL_SERVER_ERROR,
"old-style streaming calls not supported" "old-style streaming calls not supported"
), ),
ApiHandler::StreamingAsync(_) => http_bail!( ApiHandler::SerializingAsync(_) => http_bail!(
INTERNAL_SERVER_ERROR, INTERNAL_SERVER_ERROR,
"old-style streaming calls not supported" "old-style streaming calls not supported"
), ),

View File

@ -88,10 +88,10 @@ async fn handle_simple_command_future(
let result = match cli_cmd.info.handler { let result = match cli_cmd.info.handler {
ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv), ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv),
ApiHandler::StreamingSync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) ApiHandler::SerializingSync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv)
.and_then(|r| r.to_value().map_err(Error::from)), .and_then(|r| r.to_value().map_err(Error::from)),
ApiHandler::Async(handler) => (handler)(params, cli_cmd.info, &mut rpcenv).await, ApiHandler::Async(handler) => (handler)(params, cli_cmd.info, &mut rpcenv).await,
ApiHandler::StreamingAsync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) ApiHandler::SerializingAsync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv)
.await .await
.and_then(|r| r.to_value().map_err(Error::from)), .and_then(|r| r.to_value().map_err(Error::from)),
#[cfg(feature = "server")] #[cfg(feature = "server")]
@ -127,7 +127,7 @@ pub(crate) fn handle_simple_command<'cli>(
let result = match cli_cmd.info.handler { let result = match cli_cmd.info.handler {
ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, rpcenv), ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, rpcenv),
ApiHandler::StreamingSync(handler) => { ApiHandler::SerializingSync(handler) => {
(handler)(params, cli_cmd.info, rpcenv).and_then(|r| r.to_value().map_err(Error::from)) (handler)(params, cli_cmd.info, rpcenv).and_then(|r| r.to_value().map_err(Error::from))
} }
ApiHandler::Async(handler) => { ApiHandler::Async(handler) => {
@ -137,8 +137,8 @@ pub(crate) fn handle_simple_command<'cli>(
let future = (handler)(params, cli_cmd.info, rpcenv); let future = (handler)(params, cli_cmd.info, rpcenv);
(run)(future) (run)(future)
} }
ApiHandler::StreamingAsync(_handler) => { ApiHandler::SerializingAsync(_handler) => {
bail!("CliHandler does not support ApiHandler::StreamingAsync - internal error"); bail!("CliHandler does not support ApiHandler::SerializingAsync - internal error");
} }
#[cfg(feature = "server")] #[cfg(feature = "server")]
ApiHandler::AsyncHttp(_) => { ApiHandler::AsyncHttp(_) => {

View File

@ -64,11 +64,11 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm
/// } /// }
/// ///
/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new( /// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
/// &ApiHandler::StreamingSync(&hello), /// &ApiHandler::SerializingSync(&hello),
/// &ObjectSchema::new("Hello World Example", &[]) /// &ObjectSchema::new("Hello World Example", &[])
/// ); /// );
/// ``` /// ```
pub type StreamingApiHandlerFn = &'static (dyn Fn( pub type SerializingApiHandlerFn = &'static (dyn Fn(
Value, Value,
&ApiMethod, &ApiMethod,
&mut dyn RpcEnvironment, &mut dyn RpcEnvironment,
@ -109,13 +109,13 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod,
pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>; pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
/// Streaming asynchronous API handlers /// Serializing asynchronous API handlers
/// ///
/// Returns a future Value. /// Returns a future Value.
/// ``` /// ```
/// # use serde_json::{json, Value}; /// # use serde_json::{json, Value};
/// # /// #
/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn}; /// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, SerializingApiFuture, SerializableReturn};
/// use proxmox_schema::ObjectSchema; /// use proxmox_schema::ObjectSchema;
/// ///
/// ///
@ -123,7 +123,7 @@ pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error
/// param: Value, /// param: Value,
/// info: &ApiMethod, /// info: &ApiMethod,
/// rpcenv: &'a mut dyn RpcEnvironment, /// rpcenv: &'a mut dyn RpcEnvironment,
/// ) -> StreamingApiFuture<'a> { /// ) -> SerializingApiFuture<'a> {
/// Box::pin(async move { /// Box::pin(async move {
/// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!")); /// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
/// Ok(res) /// Ok(res)
@ -131,19 +131,19 @@ pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error
/// } /// }
/// ///
/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new( /// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
/// &ApiHandler::StreamingAsync(&hello_future), /// &ApiHandler::SerializingAsync(&hello_future),
/// &ObjectSchema::new("Hello World Example (async)", &[]) /// &ObjectSchema::new("Hello World Example (async)", &[])
/// ); /// );
/// ``` /// ```
pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn( pub type SerializingApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
Value, Value,
&'static ApiMethod, &'static ApiMethod,
&'a mut dyn RpcEnvironment, &'a mut dyn RpcEnvironment,
) -> StreamingApiFuture<'a> ) -> SerializingApiFuture<'a>
+ Send + Send
+ Sync); + Sync);
pub type StreamingApiFuture<'a> = Pin< pub type SerializingApiFuture<'a> = Pin<
Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>, Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
>; >;
@ -200,9 +200,9 @@ pub type ApiResponseFuture =
#[non_exhaustive] #[non_exhaustive]
pub enum ApiHandler { pub enum ApiHandler {
Sync(ApiHandlerFn), Sync(ApiHandlerFn),
StreamingSync(StreamingApiHandlerFn), SerializingSync(SerializingApiHandlerFn),
Async(ApiAsyncHandlerFn), Async(ApiAsyncHandlerFn),
StreamingAsync(StreamingApiAsyncHandlerFn), SerializingAsync(SerializingApiAsyncHandlerFn),
#[cfg(feature = "server")] #[cfg(feature = "server")]
AsyncHttp(ApiAsyncHttpHandlerFn), AsyncHttp(ApiAsyncHttpHandlerFn),
} }
@ -218,13 +218,13 @@ impl PartialEq for ApiHandler {
(ApiHandler::Sync(l), ApiHandler::Sync(r)) => { (ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
} }
(ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => { (ApiHandler::SerializingSync(l), ApiHandler::SerializingSync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
} }
(ApiHandler::Async(l), ApiHandler::Async(r)) => { (ApiHandler::Async(l), ApiHandler::Async(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
} }
(ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => { (ApiHandler::SerializingAsync(l), ApiHandler::SerializingAsync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
} }
#[cfg(feature = "server")] #[cfg(feature = "server")]