diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 89c77585..af41dd16 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error}; use std::collections::HashMap; use std::os::unix::io::AsRawFd; -use std::path::PathBuf; +use std::path::{PathBuf, Path}; use std::sync::Arc; use futures::*; use tokio::net::UnixListener; +use serde::Serialize; use serde_json::Value; use nix::sys::socket; @@ -102,43 +103,47 @@ where } -pub async fn send_command

( - path: P, - params: Value -) -> Result - where P: Into, +pub async fn send_command(path: P, params: &T) -> Result +where + P: AsRef, + T: ?Sized + Serialize, { - let path: PathBuf = path.into(); + let mut command_string = serde_json::to_string(params)?; + command_string.push('\n'); + send_raw_command(path.as_ref(), &command_string).await +} - tokio::net::UnixStream::connect(path) +pub async fn send_raw_command

(path: P, command_string: &str) -> Result +where + P: AsRef, +{ + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + + let mut conn = tokio::net::UnixStream::connect(path) .map_err(move |err| format_err!("control socket connect failed - {}", err)) - .and_then(move |mut conn| { + .await?; - let mut command_string = params.to_string(); - command_string.push('\n'); + conn.write_all(command_string.as_bytes()).await?; + if !command_string.as_bytes().ends_with(b"\n") { + conn.write_all(b"\n").await?; + } - async move { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; - - conn.write_all(command_string.as_bytes()).await?; - AsyncWriteExt::shutdown(&mut conn).await?; - let mut rx = tokio::io::BufReader::new(conn); - let mut data = String::new(); - if rx.read_line(&mut data).await? == 0 { - bail!("no response"); - } - if let Some(res) = data.strip_prefix("OK: ") { - match res.parse::() { - Ok(v) => Ok(v), - Err(err) => bail!("unable to parse json response - {}", err), - } - } else if let Some(err) = data.strip_prefix("ERROR: ") { - bail!("{}", err); - } else { - bail!("unable to parse response: {}", data); - } - } - }).await + AsyncWriteExt::shutdown(&mut conn).await?; + let mut rx = tokio::io::BufReader::new(conn); + let mut data = String::new(); + if rx.read_line(&mut data).await? == 0 { + bail!("no response"); + } + if let Some(res) = data.strip_prefix("OK: ") { + match res.parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if let Some(err) = data.strip_prefix("ERROR: ") { + bail!("{}", err); + } else { + bail!("unable to parse response: {}", data); + } } /// A callback for a specific commando socket.