From b0e1e693d9d3eb1a01372f2354eba315a9ebe0ed Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 22 Nov 2019 13:02:05 +0100 Subject: [PATCH] src/server/rest.rs: cleanup async code --- src/server/h2service.rs | 8 +- src/server/rest.rs | 246 +++++++++++++++++----------------------- 2 files changed, 107 insertions(+), 147 deletions(-) diff --git a/src/server/h2service.rs b/src/server/h2service.rs index c8a1f839..5f89f8f6 100644 --- a/src/server/h2service.rs +++ b/src/server/h2service.rs @@ -43,7 +43,7 @@ impl H2Service { let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { Ok((p,c)) => (p, c), - Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), + Err(err) => return future::err(http_err!(BAD_REQUEST, err.to_string())).boxed(), }; self.debug(format!("{} {}", method, path)); @@ -55,17 +55,17 @@ impl H2Service { match self.router.find_method(&components, method, &mut uri_param) { None => { let err = http_err!(NOT_FOUND, "Path not found.".to_string()); - Box::new(future::ok((formatter.format_error)(err))) + future::ok((formatter.format_error)(err)).boxed() } Some(api_method) => { match api_method.handler { ApiHandler::Sync(_) => { crate::server::rest::handle_sync_api_request( - self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) + self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed() } ApiHandler::Async(_) => { crate::server::rest::handle_async_api_request( - self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) + self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed() } } } diff --git a/src/server/rest.rs b/src/server/rest.rs index 36a4117e..ca146556 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use failure::*; -use futures::future::{self, Either, FutureExt, TryFutureExt}; +use futures::future::{self, FutureExt, TryFutureExt}; use futures::stream::TryStreamExt; use hyper::header; use hyper::http::request::Parts; @@ -17,7 +17,7 @@ use tokio::fs::File; use url::form_urlencoded; use proxmox::api::http_err; -use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, HttpError}; +use proxmox::api::{ApiHandler, ApiMethod, HttpError}; use proxmox::api::{RpcEnvironment, RpcEnvironmentType}; use proxmox::api::schema::{parse_simple_value, verify_json_object, parse_parameter_strings}; @@ -125,7 +125,7 @@ impl tower_service::Service> for ApiService { let method = req.method().clone(); let peer = self.peer; - Pin::from(handle_request(self.api_config.clone(), req)) + handle_request(self.api_config.clone(), req) .map(move |result| match result { Ok(res) => { log_response(&peer, method, &path, &res); @@ -149,13 +149,13 @@ impl tower_service::Service> for ApiService { } } -fn get_request_parameters_async( +async fn get_request_parameters_async( info: &'static ApiMethod, parts: Parts, req_body: Body, uri_param: HashMap, -) -> Box> + Send> -{ +) -> Result { + let mut is_json = false; if let Some(value) = parts.headers.get(header::CONTENT_TYPE) { @@ -166,13 +166,11 @@ fn get_request_parameters_async( Ok(Some("application/json")) => { is_json = true; } - _ => { - return Box::new(future::err(http_err!(BAD_REQUEST, "unsupported content type".to_string()))); - } + _ => bail!("unsupported content type {:?}", value.to_str()), } } - let resp = req_body + let body = req_body .map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err))) .try_fold(Vec::new(), |mut acc, chunk| async move { if acc.len() + chunk.len() < 64*1024 { //fimxe: max request body size? @@ -181,57 +179,55 @@ fn get_request_parameters_async( } else { Err(http_err!(BAD_REQUEST, "Request body too large".to_string())) } - }) - .and_then(move |body| async move { - let utf8 = std::str::from_utf8(&body)?; + }).await?; - let obj_schema = &info.parameters; + let utf8 = std::str::from_utf8(&body) + .map_err(|err| format_err!("Request body not uft8: {}", err))?; - if is_json { - let mut params: Value = serde_json::from_str(utf8)?; - for (k, v) in uri_param { - if let Some((_optional, prop_schema)) = obj_schema.lookup(&k) { - params[&k] = parse_simple_value(&v, prop_schema)?; - } - } - verify_json_object(¶ms, obj_schema)?; - return Ok(params); + let obj_schema = &info.parameters; + + if is_json { + let mut params: Value = serde_json::from_str(utf8)?; + for (k, v) in uri_param { + if let Some((_optional, prop_schema)) = obj_schema.lookup(&k) { + params[&k] = parse_simple_value(&v, prop_schema)?; } + } + verify_json_object(¶ms, obj_schema)?; + return Ok(params); + } - let mut param_list: Vec<(String, String)> = vec![]; + let mut param_list: Vec<(String, String)> = vec![]; - if !utf8.is_empty() { - for (k, v) in form_urlencoded::parse(utf8.as_bytes()).into_owned() { - param_list.push((k, v)); - } - } + if !utf8.is_empty() { + for (k, v) in form_urlencoded::parse(utf8.as_bytes()).into_owned() { + param_list.push((k, v)); + } + } - if let Some(query_str) = parts.uri.query() { - for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { - if k == "_dc" { continue; } // skip extjs "disable cache" parameter - param_list.push((k, v)); - } - } + if let Some(query_str) = parts.uri.query() { + for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { + if k == "_dc" { continue; } // skip extjs "disable cache" parameter + param_list.push((k, v)); + } + } - for (k, v) in uri_param { - param_list.push((k.clone(), v.clone())); - } + for (k, v) in uri_param { + param_list.push((k.clone(), v.clone())); + } - let params = parse_parameter_strings(¶m_list, obj_schema, true)?; + let params = parse_parameter_strings(¶m_list, obj_schema, true)?; - Ok(params) - }.boxed()); - - Box::new(resp) + Ok(params) } struct NoLogExtension(); -fn proxy_protected_request( +async fn proxy_protected_request( info: &'static ApiMethod, mut parts: Parts, req_body: Body, -) -> ApiFuture { +) -> Result, Error> { let mut uri_parts = parts.uri.clone().into_parts(); @@ -243,96 +239,77 @@ fn proxy_protected_request( let request = Request::from_parts(parts, req_body); + let reload_timezone = info.reload_timezone; + let resp = hyper::client::Client::new() .request(request) .map_err(Error::from) .map_ok(|mut resp| { resp.extensions_mut().insert(NoLogExtension()); resp - }); + }) + .await?; + if reload_timezone { unsafe { tzset(); } } - let reload_timezone = info.reload_timezone; - Box::new(async move { - let result = resp.await; - if reload_timezone { - unsafe { - tzset(); - } - } - result - }) + Ok(resp) } -pub fn handle_sync_api_request( +pub async fn handle_sync_api_request( mut rpcenv: Env, info: &'static ApiMethod, formatter: &'static OutputFormatter, parts: Parts, req_body: Body, uri_param: HashMap, -) -> ApiFuture -{ +) -> Result, Error> { + let handler = match info.handler { - ApiHandler::Async(_) => { - panic!("fixme"); - } + ApiHandler::Async(_) => bail!("handle_sync_api_request: internal error (called with Async handler)"), ApiHandler::Sync(handler) => handler, }; - - let params = get_request_parameters_async(info, parts, req_body, uri_param); + + let params = get_request_parameters_async(info, parts, req_body, uri_param).await?; let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); - let resp = Pin::from(params) - .and_then(move |params| { - let mut delay = false; - - let resp = match (handler)(params, info, &mut rpcenv) { - Ok(data) => (formatter.format_data)(data, &rpcenv), - Err(err) => { - if let Some(httperr) = err.downcast_ref::() { - if httperr.code == StatusCode::UNAUTHORIZED { - delay = true; - } - } - (formatter.format_error)(err) + let mut delay = false; + + let resp = match (handler)(params, info, &mut rpcenv) { + Ok(data) => (formatter.format_data)(data, &rpcenv), + Err(err) => { + if let Some(httperr) = err.downcast_ref::() { + if httperr.code == StatusCode::UNAUTHORIZED { + delay = true; } - }; - - if info.reload_timezone { - unsafe { tzset() }; } + (formatter.format_error)(err) + } + }; - if delay { - Either::Left(delayed_response(resp, delay_unauth_time)) - } else { - Either::Right(future::ok(resp)) - } - }) - .or_else(move |err| { - future::ok((formatter.format_error)(err)) - }); + if info.reload_timezone { unsafe { tzset(); } } - Box::new(resp) + if delay { + tokio::timer::delay(delay_unauth_time).await; + } + + Ok(resp) } -pub fn handle_async_api_request( +pub async fn handle_async_api_request( rpcenv: Env, info: &'static ApiMethod, formatter: &'static OutputFormatter, parts: Parts, req_body: Body, uri_param: HashMap, -) -> ApiFuture -{ +) -> Result, Error> { + let handler = match info.handler { - ApiHandler::Sync(_) => { - panic!("fixme"); - } + ApiHandler::Sync(_) => bail!("handle_async_api_request: internal error (called with Sync handler)"), ApiHandler::Async(handler) => handler, }; - + // fixme: convert parameters to Json let mut param_list: Vec<(String, String)> = vec![]; @@ -350,18 +327,14 @@ pub fn handle_async_api_request( let params = match parse_parameter_strings(¶m_list, &info.parameters, true) { Ok(v) => v, Err(err) => { - let resp = (formatter.format_error)(Error::from(err)); - return Box::new(future::ok(resp)); + return Ok((formatter.format_error)(Error::from(err))); } }; - match (handler)(parts, req_body, params, info, Box::new(rpcenv)) { - Ok(future) => future, - Err(err) => { - let resp = (formatter.format_error)(err); - Box::new(future::ok(resp)) - } - } + + let resp = (handler)(parts, req_body, params, info, Box::new(rpcenv)).await?; + + Ok(resp) } fn get_index(username: Option, token: Option) -> Response { @@ -491,9 +464,9 @@ async fn chuncked_static_file_download(filename: PathBuf) -> Result ApiFuture { +async fn handle_static_file_download(filename: PathBuf) -> Result, Error> { - let response = tokio::fs::metadata(filename.clone()) + tokio::fs::metadata(filename.clone()) .map_err(|err| http_err!(BAD_REQUEST, format!("File access problems: {}", err))) .and_then(|metadata| async move { if metadata.len() < 1024*32 { @@ -501,9 +474,8 @@ fn handle_static_file_download(filename: PathBuf) -> ApiFuture { } else { chuncked_static_file_download(filename).await } - }); - - Box::new(response) + }) + .await } fn extract_auth_data(headers: &http::HeaderMap) -> (Option, Option) { @@ -548,24 +520,12 @@ fn check_auth(method: &hyper::Method, ticket: &Option, token: &Option, - delay_unauth_time: std::time::Instant, -) -> Result, Error> { - tokio::timer::delay(delay_unauth_time).await; - Ok(resp) -} - -pub fn handle_request(api: Arc, req: Request) -> ApiFuture { +pub async fn handle_request(api: Arc, req: Request) -> Result, Error> { let (parts, body) = req.into_parts(); let method = parts.method.clone(); - - let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { - Ok((p,c)) => (p, c), - Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), - }; + let (path, components) = tools::normalize_uri_path(parts.uri.path())?; let comp_len = components.len(); @@ -580,13 +540,13 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { if comp_len >= 1 && components[0] == "api2" { if comp_len >= 2 { + let format = components[1]; + let formatter = match format { "json" => &JSON_FORMATTER, "extjs" => &EXTJS_FORMATTER, - _ => { - return Box::new(future::err(http_err!(BAD_REQUEST, format!("Unsupported output format '{}'.", format)))); - } + _ => bail!("Unsupported output format '{}'.", format), }; let mut uri_param = HashMap::new(); @@ -605,9 +565,8 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { Err(err) => { // always delay unauthorized calls by 3 seconds (from start of request) let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err)); - return Box::new( - delayed_response((formatter.format_error)(err), delay_unauth_time) - ); + tokio::timer::delay(delay_unauth_time).await; + return Ok((formatter.format_error)(err)); } } } @@ -615,29 +574,29 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { match api.find_method(&components[2..], method, &mut uri_param) { None => { let err = http_err!(NOT_FOUND, "Path not found.".to_string()); - return Box::new(future::ok((formatter.format_error)(err))); + return Ok((formatter.format_error)(err)); } Some(api_method) => { if api_method.protected && env_type == RpcEnvironmentType::PUBLIC { - return proxy_protected_request(api_method, parts, body); + return proxy_protected_request(api_method, parts, body).await; } else { match api_method.handler { ApiHandler::Sync(_) => { - return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param); + return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await; } ApiHandler::Async(_) => { - return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param); + return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await; } } } } } } - } else { + } else { // not Auth required for accessing files! if method != hyper::Method::GET { - return Box::new(future::err(http_err!(BAD_REQUEST, "Unsupported method".to_string()))); + bail!("Unsupported HTTP method {}", method); } if comp_len == 0 { @@ -646,20 +605,21 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { match check_auth(&method, &ticket, &token) { Ok(username) => { let new_token = assemble_csrf_prevention_token(csrf_secret(), &username); - return Box::new(future::ok(get_index(Some(username), Some(new_token)))); + return Ok(get_index(Some(username), Some(new_token))); } _ => { - return Box::new(delayed_response(get_index(None, None), delay_unauth_time)); + tokio::timer::delay(delay_unauth_time).await; + return Ok(get_index(None, None)); } } } else { - return Box::new(future::ok(get_index(None, None))); + return Ok(get_index(None, None)); } } else { let filename = api.find_alias(&components); - return handle_static_file_download(filename); + return handle_static_file_download(filename).await; } } - Box::new(future::err(http_err!(NOT_FOUND, "Path not found.".to_string()))) + Err(http_err!(NOT_FOUND, "Path not found.".to_string())) }