diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index b230211c..0dd3bc0e 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -1,12 +1,8 @@ use failure::*; use futures::*; -use futures::stream::Stream; use tokio::net::unix::UnixListener; -use tokio::io::AsyncRead; - -use std::io::Write; use std::path::PathBuf; use serde_json::Value; @@ -14,12 +10,11 @@ use std::sync::Arc; use std::os::unix::io::AsRawFd; use nix::sys::socket; -use proxmox::tools::try_block; - /// Listens on a Unix Socket to handle simple command asynchronously -pub fn create_control_socket(path: P, f: F) -> Result, Error> - where P: Into, - F: Send + Sync +'static + Fn(Value) -> Result, +pub fn create_control_socket(path: P, f: F) -> Result, Error> +where + P: Into, + F: Fn(Value) -> Result + Send + Sync + 'static, { let path: PathBuf = path.into(); @@ -32,60 +27,75 @@ pub fn create_control_socket(path: P, f: F) -> Result { let mygid = unsafe { libc::getgid() }; if !(cred.uid() == 0 || cred.gid() == mygid) { - bail!("no permissions for {:?}", cred); + return err(format_err!("no permissions for {:?}", cred)); } } - Err(err) => bail!("no permissions - unable to read peer credential - {}", err), + Err(e) => { + return err(format_err!( + "no permissions - unable to read peer credential - {}", + e, + )); + } } - Ok(conn) + ok(conn) }) .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); }) - .for_each(move |conn| { - let f1 = f.clone(); + .try_for_each(move |conn| { + let f = Arc::clone(&f); let (rx, mut tx) = conn.split(); let path = path3.clone(); - let path2 = path3.clone(); - let abort_future = super::last_worker_future().map_err(|_| {}); + let abort_future = super::last_worker_future().map(|_| ()); - tokio::spawn( - tokio::io::lines(std::io::BufReader::new(rx)) - .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) - .and_then(move |cmd| { - let res = try_block!({ - let param = match cmd.parse::() { - Ok(p) => p, - Err(err) => bail!("unable to parse json value - {}", err), - }; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + tokio::spawn(futures::future::select( + async move { + let mut rx = tokio::io::BufReader::new(rx); + let mut line = String::new(); + loop { + line.clear(); + match rx.read_line({ line.clear(); &mut line }).await { + Ok(0) => break, + Ok(_) => (), + Err(err) => { + eprintln!("control socket {:?} read error: {}", path, err); + return; + } + } - f1(param) - }); - - let resp = match res { - Ok(v) => format!("OK: {}\n", v), + let response = match line.parse::() { + Ok(param) => match f(param) { + Ok(res) => format!("OK: {}\n", res), + Err(err) => format!("ERROR: {}\n", err), + } Err(err) => format!("ERROR: {}\n", err), }; - Ok(resp) - }) - .for_each(move |resp| { - tx.write_all(resp.as_bytes()) - .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) - }) - .select(abort_future) - .then(move |_| { Ok(()) }) - ) + + if let Err(err) = tx.write_all(response.as_bytes()).await { + eprintln!("control socket {:?} write response error: {}", path, err); + return; + } + } + }.boxed(), + abort_future, + ).map(|_| ())); + futures::future::ok(()) }); let abort_future = super::last_worker_future().map_err(|_| {}); - let task = control_future.select(abort_future) - .then(move |_| { Ok(()) }); + let task = futures::future::select( + control_future, + abort_future, + ).map(|_| ()); Ok(task) } @@ -94,7 +104,7 @@ pub fn create_control_socket(path: P, f: F) -> Result( path: P, params: Value -) -> impl Future +) -> impl Future> where P: Into, { @@ -104,37 +114,31 @@ pub fn send_command

( .map_err(move |err| format_err!("control socket connect failed - {}", err)) .and_then(move |conn| { - let (rx, tx) = conn.split(); + let (rx, mut tx) = conn.split(); let mut command_string = params.to_string(); command_string.push('\n'); - tokio::io::write_all(tx, command_string) - .and_then(|(tx,_)| tokio::io::shutdown(tx)) - .map_err(|err| format_err!("control socket write error - {}", err)) - .and_then(move |_| { - tokio::io::lines(std::io::BufReader::new(rx)) - .into_future() - .then(|test| { - match test { - Ok((Some(data), _)) => { - if data.starts_with("OK: ") { - match data[4..].parse::() { - Ok(v) => Ok(v), - Err(err) => bail!("unable to parse json response - {}", err), - } - } else if data.starts_with("ERROR: ") { - bail!("{}", &data[7..]); - } else { - bail!("unable to parse response: {}", data); - } - } - Ok((None, _)) => { - bail!("no response"); - } - Err((err, _)) => Err(Error::from(err)), - } - }) - }) + async move { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + + tx.write_all(command_string.as_bytes()).await?; + tx.shutdown().await?; + let mut rx = tokio::io::BufReader::new(rx); + let mut data = String::new(); + if rx.read_line(&mut data).await? == 0 { + bail!("no response"); + } + if data.starts_with("OK: ") { + match data[4..].parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if data.starts_with("ERROR: ") { + bail!("{}", &data[7..]); + } else { + bail!("unable to parse response: {}", data); + } + } }) }