diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml index 56aa91e8..a957f8df 100644 --- a/proxmox-rest-server/Cargo.toml +++ b/proxmox-rest-server/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" tokio = { version = "1.6", features = ["signal", "process"] } tokio-openssl = "0.6.1" +tokio-stream = "0.1.0" tower-service = "0.3.0" url = "2.1" diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs index 709a6b1e..2e9a01fa 100644 --- a/proxmox-rest-server/src/formatter.rs +++ b/proxmox-rest-server/src/formatter.rs @@ -7,7 +7,7 @@ use serde_json::{json, Value}; use hyper::header; use hyper::{Body, Response, StatusCode}; -use proxmox_router::{HttpError, RpcEnvironment}; +use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn}; use proxmox_schema::ParameterError; /// Extension to set error message for server side logging @@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync { /// Transform json data into a http response fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response; + /// Transform serializable data into a streaming http response + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error>; + /// Transform errors into a http response fn format_error(&self, err: Error) -> Response; @@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response { response } +fn json_data_response_streaming(body: Body) -> Result, Error> { + let response = Response::builder() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(body)?; + Ok(response) +} + fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { let attributes = match rpcenv.result_attrib().as_object() { Some(attr) => attr, @@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { } } +fn start_data_streaming( + value: Value, + data: Box, +) -> tokio::sync::mpsc::Receiver, Error>> { + let (writer, reader) = tokio::sync::mpsc::channel(1); + + tokio::task::spawn_blocking(move || { + let output = proxmox_async::blocking::SenderWriter::from_sender(writer); + let mut output = std::io::BufWriter::new(output); + let mut serializer = serde_json::Serializer::new(&mut output); + let _ = data.sender_serialize(&mut serializer, value); + }); + + reader +} + struct JsonFormatter(); /// Format data as ``application/json`` @@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter { json_data_response(result) } + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let mut value = json!({}); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + fn format_error(&self, err: Error) -> Response { let mut response = if let Some(apierr) = err.downcast_ref::() { let mut resp = Response::new(Body::from(apierr.message.clone())); @@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter { json_data_response(result) } + fn format_data_streaming( + &self, + data: Box, + rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let mut value = json!({ + "success": true, + }); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + fn format_error(&self, err: Error) -> Response { let mut errors = HashMap::new();