From dfb73ee28689689847fde30a4594d6628594786e Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 10 Apr 2019 12:42:24 +0200 Subject: [PATCH] src/server/worker_task.rs: implement abort_worker (via command_socket) --- src/server/command_socket.rs | 49 ++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 92a1322c..57a54a3a 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -70,3 +70,52 @@ pub fn create_control_socket(path: P, f: F) -> Result( + path: P, + params: Value +) -> impl Future + where P: Into, + +{ + let path: PathBuf = path.into(); + + tokio::net::UnixStream::connect(path) + .map_err(move |err| format_err!("control socket connect failed - {}", err)) + .and_then(move |conn| { + + let (rx, tx) = conn.split(); + + let mut command_string = params.to_string(); + command_string.push('\n'); + + tokio::io::write_all(tx, command_string) + .and_then(|(tx,_)| tokio::io::shutdown(tx)) + .map_err(|err| format_err!("control socket write error - {}", err)) + .and_then(move |_| { + tokio::io::lines(std::io::BufReader::new(rx)) + .into_future() + .then(|test| { + match test { + Ok((Some(data), _)) => { + if data.starts_with("OK: ") { + match data[4..].parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if data.starts_with("ERROR: ") { + bail!("{}", &data[7..]); + } else { + bail!("unable to parse response: {}", data); + } + } + Ok((None, _)) => { + bail!("no response"); + } + Err((err, _)) => Err(Error::from(err)), + } + }) + }) + }) +}