diff --git a/Cargo.toml b/Cargo.toml index 42c4bbcc..ddfaeaee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,15 +10,15 @@ path = "src/lib.rs" [dependencies] base64 = "0.10" -bytes = "0.4" +bytes = "0.5" chrono = "0.4" # Date and time library for Rust crc32fast = "1" endian_trait = { version = "0.6", features = ["arrays"] } failure = "0.1" -futures-preview = "0.3.0-alpha" -h2 = { version = "0.2.0-alpha.1", features = ["stream"] } -http = "0.1" -hyper = { version = "0.13.0-alpha.1" } +futures = "0.3" +h2 = { version = "0.2", features = ["stream"] } +http = "0.2" +hyper = "0.13" lazy_static = "1.3" libc = "0.2" log = "0.4" @@ -35,11 +35,10 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" siphasher = "0.3" syslog = "4.0" -tokio = { version = "0.2.0-alpha.4" } -tokio-executor = { version = "0.2.0-alpha.4" } -tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] } -tokio-openssl = "0.4.0-alpha.2" -tower-service = "0.3.0-alpha.1" +tokio = { version = "0.2.0", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] } +tokio-util = { version = "0.2.0", features = [ "codec" ] } +tokio-openssl = "0.4.0" +tower-service = "0.3.0" url = "1.7" valgrind_request = { version = "1.1", optional = true } walkdir = "2" diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 17943094..6bf6bea8 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -507,8 +507,8 @@ fn download_file( .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err))) .await?; - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); + let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); let body = Body::wrap_stream(payload); // fixme: set other headers ? diff --git a/src/api2/reader.rs b/src/api2/reader.rs index 9f425b5c..42801526 100644 --- a/src/api2/reader.rs +++ b/src/api2/reader.rs @@ -192,8 +192,8 @@ fn download_file( env.log(format!("download {:?}", path3)); - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); + let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); let body = Body::wrap_stream(payload); @@ -275,8 +275,8 @@ fn download_chunk_old( .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .and_then(move |file| { env2.debug(format!("download chunk {:?}", path3)); - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); + let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); let body = Body::wrap_stream(payload); diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index 4d0ca6b6..2d2a95c7 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -60,7 +60,7 @@ where None => { this.scan_pos = 0; if this.buffer.len() > 0 { - return Poll::Ready(Some(Ok(this.buffer.take()))); + return Poll::Ready(Some(Ok(this.buffer.split()))); } else { return Poll::Ready(None); } @@ -99,7 +99,7 @@ where let this = self.get_mut(); loop { if this.buffer.len() == this.chunk_size { - return Poll::Ready(Some(Ok(this.buffer.take()))); + return Poll::Ready(Some(Ok(this.buffer.split()))); } else if this.buffer.len() > this.chunk_size { let result = this.buffer.split_to(this.chunk_size); return Poll::Ready(Some(Ok(result))); @@ -112,7 +112,7 @@ where None => { // last chunk can have any size if this.buffer.len() > 0 { - return Poll::Ready(Some(Ok(this.buffer.take()))); + return Poll::Ready(Some(Ok(this.buffer.split()))); } else { return Poll::Ready(None); } diff --git a/src/bin/h2client.rs b/src/bin/h2client.rs index 6abb014b..542ecff0 100644 --- a/src/bin/h2client.rs +++ b/src/bin/h2client.rs @@ -35,7 +35,7 @@ impl Future for Process { } else { match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) { Some(Ok(chunk)) => { - this.body.release_capacity().release_capacity(chunk.len())?; + this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); }, diff --git a/src/bin/h2s-client.rs b/src/bin/h2s-client.rs index 70bb088e..df9a95b8 100644 --- a/src/bin/h2s-client.rs +++ b/src/bin/h2s-client.rs @@ -34,7 +34,7 @@ impl Future for Process { } else { match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) { Some(Ok(chunk)) => { - this.body.release_capacity().release_capacity(chunk.len())?; + this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); }, diff --git a/src/bin/h2s-server.rs b/src/bin/h2s-server.rs index b8c7926a..39483af2 100644 --- a/src/bin/h2s-server.rs +++ b/src/bin/h2s-server.rs @@ -24,12 +24,12 @@ async fn main() -> Result<(), Error> { let acceptor = Arc::new(acceptor.build()); - let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); - let mut incoming = listener.incoming(); - while let Some(socket) = incoming.try_next().await? { + loop { + let (socket, _addr) = listener.accept().await?; tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)) .map(|res| { if let Err(err) = res { @@ -37,8 +37,6 @@ async fn main() -> Result<(), Error> { } })); } - - Ok(()) } async fn handle_connection( diff --git a/src/bin/h2server.rs b/src/bin/h2server.rs index 8477ec71..3d602134 100644 --- a/src/bin/h2server.rs +++ b/src/bin/h2server.rs @@ -10,12 +10,12 @@ use proxmox_backup::client::pipe_to_stream::PipeToSendStream; #[tokio::main] async fn main() -> Result<(), Error> { - let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); - let mut incoming = listener.incoming(); - while let Some(socket) = incoming.try_next().await? { + loop { + let (socket, _addr) = listener.accept().await?; tokio::spawn(handle_connection(socket) .map(|res| { if let Err(err) = res { @@ -23,8 +23,6 @@ async fn main() -> Result<(), Error> { } })); } - - Ok(()) } async fn handle_connection(socket: T) -> Result<(), Error> { diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index 64ca7c42..9062304a 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -52,8 +52,9 @@ async fn run() -> Result<(), Error> { let server = daemon::create_daemon( ([127,0,0,1], 82).into(), move |listener, ready| { + let incoming = proxmox_backup::tools::async_io::StaticIncoming::from(listener); Ok(ready - .and_then(|_| hyper::Server::builder(listener.incoming()) + .and_then(|_| hyper::Server::builder(incoming) .serve(rest_server) .with_graceful_shutdown(server::shutdown_future()) .map_err(Error::from) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 0831c4d6..4908de8b 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -186,7 +186,9 @@ async fn backup_directory>( // spawn chunker inside a separate task so that it can run parallel tokio::spawn(async move { - let _ = tx.send_all(&mut chunk_stream).await; + while let Some(v) = chunk_stream.next().await { + let _ = tx.send(v).await; + } }); let stats = client @@ -210,7 +212,7 @@ async fn backup_image>( let file = tokio::fs::File::open(path).await?; - let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) .map_err(Error::from); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); @@ -2443,8 +2445,9 @@ We do not extraxt '.pxar' archives when writing to stdandard output. } fn async_main(fut: F) -> ::Output { - let rt = tokio::runtime::Runtime::new().unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); let ret = rt.block_on(fut); - rt.shutdown_now(); + // This does not exist anymore. We need to actually stop our runaways instead... + // rt.shutdown_now(); ret } diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index b6e23883..d4513cbc 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -66,10 +66,9 @@ async fn run() -> Result<(), Error> { let server = daemon::create_daemon( ([0,0,0,0,0,0,0,0], 8007).into(), |listener, ready| { - let connections = listener - .incoming() + let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener) .map_err(Error::from) - .try_filter_map(move |sock| { + .try_filter_map(move |(sock, _addr)| { let acceptor = Arc::clone(&acceptor); async move { sock.set_nodelay(true).unwrap(); @@ -81,6 +80,7 @@ async fn run() -> Result<(), Error> { ) } }); + let connections = proxmox_backup::tools::async_io::HyperAccept(connections); Ok(ready .and_then(|_| hyper::Server::builder(connections) diff --git a/src/bin/test_chunk_speed2.rs b/src/bin/test_chunk_speed2.rs index 3fe7024a..27f7a157 100644 --- a/src/bin/test_chunk_speed2.rs +++ b/src/bin/test_chunk_speed2.rs @@ -23,7 +23,7 @@ async fn run() -> Result<(), Error> { let file = tokio::fs::File::open("random-test.dat").await?; - let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) .map_ok(|bytes| bytes.to_vec()) .map_err(Error::from); diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index ec4d19e6..381cb31e 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -267,7 +267,21 @@ impl BackupWriter { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); let (verify_result_tx, verify_result_rx) = oneshot::channel(); - hyper::rt::spawn( + // FIXME: check if this works as expected as replacement for the combinator below? + // tokio::spawn(async move { + // let result: Result<(), Error> = (async move { + // while let Some(response) = verify_queue_rx.recv().await { + // match H2Client::h2api_response(response.await?).await { + // Ok(result) => println!("RESPONSE: {:?}", result), + // Err(err) => bail!("pipelined request failed: {}", err), + // } + // } + // Ok(()) + // }).await; + // let _ignore_closed_channel = verify_result_tx.send(result); + // }); + // old code for reference? + tokio::spawn( verify_queue_rx .map(Ok::<_, Error>) .try_for_each(|response: h2::client::ResponseFuture| { @@ -294,7 +308,8 @@ impl BackupWriter { let h2_2 = h2.clone(); - hyper::rt::spawn( + // FIXME: async-block-ify this code! + tokio::spawn( verify_queue_rx .map(Ok::<_, Error>) .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { @@ -329,7 +344,7 @@ impl BackupWriter { println!("append chunks list len ({})", digest_list.len()); let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap(); - let param_data = bytes::Bytes::from(param.to_string().as_bytes()); + let param_data = bytes::Bytes::from(param.to_string().into_bytes()); let upload_data = Some(param_data); h2_2.send_request(request, upload_data) .and_then(move |response| { @@ -373,12 +388,12 @@ impl BackupWriter { } let mut body = resp.into_body(); - let mut release_capacity = body.release_capacity().clone(); + let mut flow_control = body.flow_control().clone(); let mut stream = DigestListDecoder::new(body.map_err(Error::from)); while let Some(chunk) = stream.try_next().await? { - let _ = release_capacity.release_capacity(chunk.len()); + let _ = flow_control.release_capacity(chunk.len()); println!("GOT DOWNLOAD {}", digest_to_hex(&chunk)); known_chunks.lock().unwrap().insert(chunk); } @@ -466,7 +481,7 @@ impl BackupWriter { println!("upload new chunk {} ({} bytes, offset {})", digest_str, chunk_info.chunk_len, offset); - let chunk_data = chunk_info.chunk.raw_data(); + let chunk_data = chunk_info.chunk.into_inner(); let param = json!({ "wid": wid, "digest": digest_str, @@ -487,7 +502,7 @@ impl BackupWriter { upload_queue .send((new_info, Some(response))) .await - .map_err(Error::from) + .map_err(|err| format_err!("failed to send to upload queue: {}", err)) }) ) } else { @@ -496,7 +511,7 @@ impl BackupWriter { upload_queue .send((merged_chunk_info, None)) .await - .map_err(Error::from) + .map_err(|err| format_err!("failed to send to upload queue: {}", err)) }) } }) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 2399e3eb..15a13e5c 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -1,4 +1,5 @@ use std::io::Write; +use std::task::{Context, Poll}; use chrono::Utc; use failure::*; @@ -329,7 +330,7 @@ impl HttpClient { let connection = connection.map(|_| ()); // Spawn a new task to drive the connection state - hyper::rt::spawn(connection); + tokio::spawn(connection); // Wait until the `SendRequest` handle has available capacity. let c = h2.ready().await?; @@ -358,10 +359,7 @@ impl HttpClient { async fn api_response(response: Response) -> Result { let status = response.status(); - let data = response - .into_body() - .try_concat() - .await?; + let data = hyper::body::to_bytes(response.into_body()).await?; let text = String::from_utf8(data.to_vec()).unwrap(); if status.is_success() { @@ -487,10 +485,9 @@ impl H2Client { } let mut body = resp.into_body(); - let mut release_capacity = body.release_capacity().clone(); - - while let Some(chunk) = body.try_next().await? { - let _ = release_capacity.release_capacity(chunk.len()); + while let Some(chunk) = body.data().await { + let chunk = chunk?; + body.flow_control().release_capacity(chunk.len())?; output.write_all(&chunk)?; } @@ -561,18 +558,14 @@ impl H2Client { let (_head, mut body) = response.into_parts(); - // The `release_capacity` handle allows the caller to manage - // flow control. - // - // Whenever data is received, the caller is responsible for - // releasing capacity back to the server once it has freed - // the data from memory. - let mut release_capacity = body.release_capacity().clone(); - let mut data = Vec::new(); - while let Some(chunk) = body.try_next().await? { + while let Some(chunk) = body.data().await { + let chunk = chunk?; + // Whenever data is received, the caller is responsible for + // releasing capacity back to the server once it has freed + // the data from memory. // Let the server send more data. - let _ = release_capacity.release_capacity(chunk.len()); + body.flow_control().release_capacity(chunk.len())?; data.extend(chunk); } @@ -632,9 +625,10 @@ impl H2Client { } } +#[derive(Clone)] pub struct HttpsConnector { http: HttpConnector, - ssl_connector: SslConnector, + ssl_connector: std::sync::Arc, } impl HttpsConnector { @@ -643,7 +637,7 @@ impl HttpsConnector { Self { http, - ssl_connector, + ssl_connector: std::sync::Arc::new(ssl_connector), } } } @@ -653,29 +647,38 @@ type MaybeTlsStream = EitherStream< tokio_openssl::SslStream, >; -impl hyper::client::connect::Connect for HttpsConnector { - type Transport = MaybeTlsStream; +impl hyper::service::Service for HttpsConnector { + type Response = MaybeTlsStream; type Error = Error; - type Future = Box> + Send + Unpin + 'static>; + type Future = std::pin::Pin> + Send + 'static + >>; - fn connect(&self, dst: hyper::client::connect::Destination) -> Self::Future { - let is_https = dst.scheme() == "https"; - let host = dst.host().to_string(); + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + // This connector is always ready, but others might not be. + Poll::Ready(Ok(())) + } - let config = self.ssl_connector.configure(); - let conn = self.http.connect(dst); + fn call(&mut self, dst: Uri) -> Self::Future { + let mut this = self.clone(); + async move { + let is_https = dst + .scheme() + .ok_or_else(|| format_err!("missing URL scheme"))? + == "https"; + let host = dst + .host() + .ok_or_else(|| format_err!("missing hostname in destination url?"))? + .to_string(); - Box::new(Box::pin(async move { - let (conn, connected) = conn.await?; + let config = this.ssl_connector.configure(); + let conn = this.http.call(dst).await?; if is_https { let conn = tokio_openssl::connect(config?, &host, conn).await?; - Ok((MaybeTlsStream::Right(conn), connected)) + Ok(MaybeTlsStream::Right(conn)) } else { - Ok((MaybeTlsStream::Left(conn), connected)) + Ok(MaybeTlsStream::Left(conn)) } - })) + }.boxed() } } diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 0dd3bc0e..6baefe19 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -2,7 +2,7 @@ use failure::*; use futures::*; -use tokio::net::unix::UnixListener; +use tokio::net::UnixListener; use std::path::PathBuf; use serde_json::Value; @@ -11,23 +11,25 @@ use std::os::unix::io::AsRawFd; use nix::sys::socket; /// Listens on a Unix Socket to handle simple command asynchronously -pub fn create_control_socket(path: P, f: F) -> Result, Error> +pub fn create_control_socket(path: P, func: F) -> Result, Error> where P: Into, F: Fn(Value) -> Result + Send + Sync + 'static, { let path: PathBuf = path.into(); - let socket = UnixListener::bind(&path)?; + let mut socket = UnixListener::bind(&path)?; - let f = Arc::new(f); - let path2 = Arc::new(path); - let path3 = path2.clone(); + let func = Arc::new(func); - let control_future = socket.incoming() - .map_err(Error::from) - .and_then(|conn| { - use futures::future::{err, ok}; + let control_future = async move { + loop { + let (conn, _addr) = socket + .accept() + .await + .map_err(|err| { + format_err!("failed to accept on control socket {:?}: {}", path, err) + })?; // check permissions (same gid, or root user) let opt = socket::sockopt::PeerCredentials {}; @@ -35,28 +37,19 @@ where Ok(cred) => { let mygid = unsafe { libc::getgid() }; if !(cred.uid() == 0 || cred.gid() == mygid) { - return err(format_err!("no permissions for {:?}", cred)); + bail!("no permissions for {:?}", cred); } } - Err(e) => { - return err(format_err!( - "no permissions - unable to read peer credential - {}", - e, - )); - } + Err(e) => bail!("no permissions - unable to read peer credential - {}", e), } - ok(conn) - }) - .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); }) - .try_for_each(move |conn| { - let f = Arc::clone(&f); - let (rx, mut tx) = conn.split(); - let path = path3.clone(); + let (rx, mut tx) = tokio::io::split(conn); let abort_future = super::last_worker_future().map(|_| ()); use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + let func = Arc::clone(&func); + let path = path.clone(); tokio::spawn(futures::future::select( async move { let mut rx = tokio::io::BufReader::new(rx); @@ -73,7 +66,7 @@ where } let response = match line.parse::() { - Ok(param) => match f(param) { + Ok(param) => match func(param) { Ok(res) => format!("OK: {}\n", res), Err(err) => format!("ERROR: {}\n", err), } @@ -88,14 +81,14 @@ where }.boxed(), abort_future, ).map(|_| ())); - futures::future::ok(()) - }); + } + }.boxed(); let abort_future = super::last_worker_future().map_err(|_| {}); let task = futures::future::select( control_future, abort_future, - ).map(|_| ()); + ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); Ok(task) } @@ -112,9 +105,7 @@ pub fn send_command

( tokio::net::UnixStream::connect(path) .map_err(move |err| format_err!("control socket connect failed - {}", err)) - .and_then(move |conn| { - - let (rx, mut tx) = conn.split(); + .and_then(move |mut conn| { let mut command_string = params.to_string(); command_string.push('\n'); @@ -122,9 +113,9 @@ pub fn send_command

( async move { use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; - tx.write_all(command_string.as_bytes()).await?; - tx.shutdown().await?; - let mut rx = tokio::io::BufReader::new(rx); + conn.write_all(command_string.as_bytes()).await?; + AsyncWriteExt::shutdown(&mut conn).await?; + let mut rx = tokio::io::BufReader::new(conn); let mut data = String::new(); if rx.read_line(&mut data).await? == 0 { bail!("no response"); diff --git a/src/server/rest.rs b/src/server/rest.rs index e20fa798..119883f1 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::future::Future; use std::hash::BuildHasher; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -10,10 +11,10 @@ use futures::future::{self, FutureExt, TryFutureExt}; use futures::stream::TryStreamExt; use hyper::header; use hyper::http::request::Parts; -use hyper::rt::Future; use hyper::{Body, Request, Response, StatusCode}; use serde_json::{json, Value}; use tokio::fs::File; +use tokio::time::Instant; use url::form_urlencoded; use proxmox::api::http_err; @@ -291,7 +292,7 @@ pub async fn handle_api_request { if let Some(httperr) = err.downcast_ref::() { if httperr.code == StatusCode::UNAUTHORIZED { - tokio::timer::delay(delay_unauth_time).await; + tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await; } } (formatter.format_error)(err) @@ -417,8 +418,8 @@ async fn chuncked_static_file_download(filename: PathBuf) -> Result, req: Request) -> Result { // always delay unauthorized calls by 3 seconds (from start of request) let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err)); - tokio::timer::delay(delay_unauth_time).await; + tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await; return Ok((formatter.format_error)(err)); } } @@ -567,7 +568,7 @@ pub async fn handle_request(api: Arc, req: Request) -> Result { - tokio::timer::delay(delay_unauth_time).await; + tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await; return Ok(get_index(None, None)); } } diff --git a/src/server/state.rs b/src/server/state.rs index dd1c9fda..251931e3 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use futures::*; -use tokio_net::signal::unix::{signal, SignalKind}; +use tokio::signal::unix::{signal, SignalKind}; use crate::tools::{self, BroadcastData}; @@ -34,28 +34,30 @@ lazy_static! { pub fn server_state_init() -> Result<(), Error> { - let stream = signal(SignalKind::interrupt())?; + let mut stream = signal(SignalKind::interrupt())?; - let future = stream.for_each(|_| { - println!("got shutdown request (SIGINT)"); - SERVER_STATE.lock().unwrap().reload_request = false; - tools::request_shutdown(); - futures::future::ready(()) - }); + let future = async move { + while stream.recv().await.is_some() { + println!("got shutdown request (SIGINT)"); + SERVER_STATE.lock().unwrap().reload_request = false; + tools::request_shutdown(); + } + }.boxed(); let abort_future = last_worker_future().map_err(|_| {}); let task = futures::future::select(future, abort_future); tokio::spawn(task.map(|_| ())); - let stream = signal(SignalKind::hangup())?; + let mut stream = signal(SignalKind::hangup())?; - let future = stream.for_each(|_| { - println!("got reload request (SIGHUP)"); - SERVER_STATE.lock().unwrap().reload_request = true; - tools::request_shutdown(); - futures::future::ready(()) - }); + let future = async move { + while stream.recv().await.is_some() { + println!("got reload request (SIGHUP)"); + SERVER_STATE.lock().unwrap().reload_request = true; + tools::request_shutdown(); + } + }.boxed(); let abort_future = last_worker_future().map_err(|_| {}); let task = futures::future::select(future, abort_future); diff --git a/src/tools.rs b/src/tools.rs index 38d822fa..90cc8f1b 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -19,7 +19,6 @@ use proxmox::tools::vec; pub mod acl; pub mod async_io; -pub mod async_mutex; pub mod borrow; pub mod daemon; pub mod fs; diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs index 2ce01a68..c3b9d935 100644 --- a/src/tools/async_io.rs +++ b/src/tools/async_io.rs @@ -1,10 +1,15 @@ //! Generic AsyncRead/AsyncWrite utilities. use std::io; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; +use futures::stream::{Stream, TryStream}; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpListener; +use hyper::client::connect::Connection; pub enum EitherStream { Left(L), @@ -27,7 +32,7 @@ impl AsyncRead for EitherStream { } } - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { match *self { EitherStream::Left(ref s) => s.prepare_uninitialized_buffer(buf), EitherStream::Right(ref s) => s.prepare_uninitialized_buffer(buf), @@ -109,3 +114,83 @@ impl AsyncWrite for EitherStream { } } } + +// we need this for crate::client::http_client: +impl Connection for EitherStream< + tokio::net::TcpStream, + tokio_openssl::SslStream, +> { + fn connected(&self) -> hyper::client::connect::Connected { + match self { + EitherStream::Left(s) => s.connected(), + EitherStream::Right(s) => s.get_ref().connected(), + } + } +} + +/// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard +/// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener` +pub struct StaticIncoming(TcpListener); + +impl From for StaticIncoming { + fn from(inner: TcpListener) -> Self { + Self(inner) + } +} + +impl AsRawFd for StaticIncoming { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +impl hyper::server::accept::Accept for StaticIncoming { + type Conn = tokio::net::TcpStream; + type Error = std::io::Error; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + match self.get_mut().0.poll_accept(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok((conn, _addr))) => Poll::Ready(Some(Ok(conn))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + } + } +} + +/// We also implement TryStream for this, as tokio doesn't do this anymore either and we want to be +/// able to map connections to then add eg. ssl to them. This support code makes the changes +/// required for hyper 0.13 a bit less annoying to read. +impl Stream for StaticIncoming { + type Item = std::io::Result<(tokio::net::TcpStream, std::net::SocketAddr)>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match self.get_mut().0.poll_accept(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => Poll::Ready(Some(result)), + } + } +} + +/// Implement hyper's `Accept` for any `TryStream` of sockets: +pub struct HyperAccept(pub T); + + +impl hyper::server::accept::Accept for HyperAccept +where + T: TryStream, + I: AsyncRead + AsyncWrite, +{ + type Conn = I; + type Error = T::Error; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + let this = unsafe { self.map_unchecked_mut(|this| &mut this.0) }; + this.try_poll_next(cx) + } +} diff --git a/src/tools/daemon.rs b/src/tools/daemon.rs index febca8bd..0eaf70e8 100644 --- a/src/tools/daemon.rs +++ b/src/tools/daemon.rs @@ -193,7 +193,6 @@ impl Reloadable for tokio::net::TcpListener { fd_change_cloexec(fd, true)?; Ok(Self::from_std( unsafe { std::net::TcpListener::from_raw_fd(fd) }, - &tokio_net::driver::Handle::default(), )?) } } diff --git a/src/tools/futures.rs b/src/tools/futures.rs index 19cd2a16..30d7b7cf 100644 --- a/src/tools/futures.rs +++ b/src/tools/futures.rs @@ -7,8 +7,7 @@ use std::task::{Context, Poll}; use failure::Error; use futures::future::FutureExt; - -use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture}; +use tokio::sync::oneshot; /// Make a future cancellable. /// @@ -42,11 +41,11 @@ use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture}; pub struct Cancellable { /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the /// lock, so that our LockFuture finishes. - inner: futures::future::Select>, + inner: futures::future::Select>, /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the /// future, it'll drop this guard, causing our inner future to resolve to `None`. - guard: Arc>>>, + sender: Arc>>>, } /// Reference to a cancellable future. Multiple instances may exist simultaneously. @@ -55,14 +54,14 @@ pub struct Cancellable { /// /// This can be cloned to be used in multiple places. #[derive(Clone)] -pub struct Canceller(Arc>>>); +pub struct Canceller(Arc>>>); impl Canceller { /// Cancel the associated future. /// /// This does nothing if the future already finished successfully. pub fn cancel(&self) { - *self.0.lock().unwrap() = None; + let _ = self.0.lock().unwrap().take().unwrap().send(()); } } @@ -71,19 +70,20 @@ impl Cancellable { /// /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. pub fn new(inner: T) -> Result<(Self, Canceller), Error> { - // we don't even need to sture the mutex... - let (mutex, guard) = AsyncMutex::new_locked(())?; + // we don't even need to store the mutex... + let (tx, rx) = oneshot::channel(); let this = Self { - inner: futures::future::select(inner, mutex.lock()), - guard: Arc::new(Mutex::new(Some(guard))), + inner: futures::future::select(inner, rx), + sender: Arc::new(Mutex::new(Some(tx))), }; + let canceller = this.canceller(); Ok((this, canceller)) } /// Create another `Canceller` for this future. pub fn canceller(&self) -> Canceller { - Canceller(self.guard.clone()) + Canceller(Arc::clone(&self.sender)) } } diff --git a/src/tools/runtime.rs b/src/tools/runtime.rs index 6d944501..f5e2ca92 100644 --- a/src/tools/runtime.rs +++ b/src/tools/runtime.rs @@ -7,7 +7,7 @@ where F: Future + Send + 'static, T: std::fmt::Debug + Send + 'static, { - let rt = tokio::runtime::Runtime::new().unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index 2f8bf682..422b17cd 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -2,7 +2,7 @@ use std::io::{self, Read}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_executor::threadpool::blocking; +use tokio::task::block_in_place; use futures::stream::Stream; pub struct WrappedReaderStream { @@ -24,8 +24,8 @@ impl Stream for WrappedReaderStream { fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { let this = self.get_mut(); - match blocking(|| this.reader.read(&mut this.buffer)) { - Poll::Ready(Ok(Ok(n))) => { + match block_in_place(|| this.reader.read(&mut this.buffer)) { + Ok(n) => { if n == 0 { // EOF Poll::Ready(None) @@ -33,12 +33,7 @@ impl Stream for WrappedReaderStream { Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) } } - Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new( - io::ErrorKind::Other, - err.to_string(), - )))), - Poll::Pending => Poll::Pending, + Err(err) => Poll::Ready(Some(Err(err))), } } }