diff --git a/Cargo.toml b/Cargo.toml index b718fe83..cdc352f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ tokio-tls = "0.2" tokio-signal = "0.2" native-tls = "0.2" http = "0.1" +h2 = "0.1" hyper = "0.12" hyper-tls = "0.3" lazy_static = "1.1" diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index d7f067aa..71e6df1b 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -20,6 +20,7 @@ use crate::server::WorkerTask; mod pxar; mod upload; +mod h2upload; fn group_backups(backup_list: Vec) -> HashMap> { @@ -403,6 +404,10 @@ pub fn router() -> Router { "test-upload", Router::new() .upgrade(upload::api_method_upgrade_upload())) + .subdir( + "h2upload", + Router::new() + .upgrade(h2upload::api_method_upgrade_h2upload())) .subdir( "gc", Router::new() diff --git a/src/api2/admin/datastore/h2upload.rs b/src/api2/admin/datastore/h2upload.rs new file mode 100644 index 00000000..dc7d9511 --- /dev/null +++ b/src/api2/admin/datastore/h2upload.rs @@ -0,0 +1,80 @@ +use failure::*; + +use futures::{Future, Stream}; +use h2::server; +use hyper::header::{HeaderValue, UPGRADE}; +use hyper::{Body, Response, StatusCode}; +use hyper::http::request::Parts; +use hyper::rt; + +use serde_json::Value; + +use crate::api_schema::router::*; +use crate::api_schema::*; + +pub fn api_method_upgrade_h2upload() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upgrade_h2upload, + ObjectSchema::new("Experimental h2 server") + .required("store", StringSchema::new("Datastore name.")), + ) +} + +fn upgrade_h2upload( + parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + _rpcenv: &mut RpcEnvironment, +) -> Result { + let expected_protocol: &'static str = "proxmox-backup-protocol-h2"; + + let protocols = parts + .headers + .get("UPGRADE") + .ok_or_else(|| format_err!("missing Upgrade header"))? + .to_str()?; + + if protocols != expected_protocol { + bail!("invalid protocol name"); + } + + rt::spawn( + req_body + .on_upgrade() + .map_err(|e| Error::from(e)) + .and_then(move |conn| { + println!("upgrade done"); + server::handshake(conn) + .and_then(|h2| { + // Accept all inbound HTTP/2.0 streams sent over the + // connection. + h2.for_each(|(request, mut respond)| { + println!("Received request: {:?}", request); + + // Build a response with no body + let response = Response::builder() + .status(StatusCode::OK) + .body(()) + .unwrap(); + + // Send the response back to the client + respond.send_response(response, true) + .unwrap(); + + Ok(()) + }) + }) + .map_err(Error::from) + }) + .map_err(|e| eprintln!("error during upgrade: {}", e)) + ); + + Ok(Box::new(futures::future::ok( + Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS) + .header(UPGRADE, HeaderValue::from_static(expected_protocol)) + .body(Body::empty()) + .unwrap() + ))) +}