file-restore: split out internal QMP logic from unrelated fn

Capsule it in a small QMPSock struct impl, make the usage nicer as
the caller should not have to care & keep track of the initial socket
state+details.

A send_raw and send Value method should cover most needs.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-11-14 16:13:29 +01:00
parent 2f0f3e9979
commit 7fa5be8b3f

View File

@ -7,9 +7,8 @@ use std::time::{Duration, Instant};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::json; use serde_json::json;
use tokio::io::AsyncBufRead;
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt}, io::{AsyncBufReadExt, AsyncWriteExt},
time, time,
}; };
@ -124,15 +123,45 @@ async fn create_temp_initramfs(ticket: &str, debug: bool) -> Result<(File, Strin
Ok((tmp_file, path)) Ok((tmp_file, path))
} }
async fn send_qmp_request<T: AsyncBufRead + AsyncWrite + Unpin>( struct QMPSock {
stream: &mut T, sock: tokio::io::BufStream<tokio::net::UnixStream>,
request: &str, initialized: bool,
) -> Result<String, Error> { }
stream.write_all(request.as_bytes()).await?;
stream.flush().await?; impl QMPSock {
/// Creates a new QMP socket connection. No active initialization is done until actual requests
/// are made.
pub async fn new(cid: i32) -> Result<Self, Error> {
let path = format!("{QMP_SOCKET_PREFIX}{cid}.sock");
let sock = tokio::io::BufStream::new(tokio::net::UnixStream::connect(path).await?);
Ok(QMPSock {
sock,
initialized: false,
})
}
/// Transparently serializes the QMP request and sends it out with `send_raw`
pub async fn send(&mut self, request: serde_json::Value) -> Result<String, Error> {
self.send_raw(&serde_json::to_string(&request)?).await
}
/// Send out raw (already serialized) QMP requests, handling initialization transparently
pub async fn send_raw(&mut self, request: &str) -> Result<String, Error> {
if !self.initialized {
let _ = self.sock.read_line(&mut String::new()).await?; // initial qmp message
self.do_send("{\"execute\":\"qmp_capabilities\"}\n").await?;
self.initialized = true;
}
self.do_send(request).await
}
async fn do_send(&mut self, request: &str) -> Result<String, Error> {
self.sock.write_all(request.as_bytes()).await?;
self.sock.flush().await?;
let mut buf = String::new(); let mut buf = String::new();
let _ = stream.read_line(&mut buf).await?; let _ = self.sock.read_line(&mut buf).await?;
Ok(buf) Ok(buf)
}
} }
pub(crate) async fn hotplug_memory(cid: i32, dimm_mb: usize) -> Result<(), Error> { pub(crate) async fn hotplug_memory(cid: i32, dimm_mb: usize) -> Result<(), Error> {
@ -140,30 +169,27 @@ pub(crate) async fn hotplug_memory(cid: i32, dimm_mb: usize) -> Result<(), Error
bail!("cannot set to {dimm_mb}M, maximum is {MAX_MEMORY_DIMM_SIZE}M"); bail!("cannot set to {dimm_mb}M, maximum is {MAX_MEMORY_DIMM_SIZE}M");
} }
let path = format!("{QMP_SOCKET_PREFIX}{cid}.sock"); let mut qmp = QMPSock::new(cid).await?;
let mut stream = tokio::io::BufStream::new(tokio::net::UnixStream::connect(path).await?);
let _ = stream.read_line(&mut String::new()).await?; // initial qmp message qmp.send(json!({
let _ = send_qmp_request(&mut stream, "{\"execute\":\"qmp_capabilities\"}\n").await?;
let request = json!({
"execute": "object-add", "execute": "object-add",
"arguments": { "arguments": {
"qom-type": "memory-backend-ram", "qom-type": "memory-backend-ram",
"id": "mem0", "id": "mem0",
"size": dimm_mb * 1024 * 1024, "size": dimm_mb * 1024 * 1024,
} }
}); }))
let _ = send_qmp_request(&mut stream, &serde_json::to_string(&request)?).await?; .await?;
let request = json!({
qmp.send(json!({
"execute": "device_add", "execute": "device_add",
"arguments": { "arguments": {
"driver": "pc-dimm", "driver": "pc-dimm",
"id": "dimm0", "id": "dimm0",
"memdev": "mem0", "memdev": "mem0",
} }
}); }))
let _ = send_qmp_request(&mut stream, &serde_json::to_string(&request)?).await?; .await?;
Ok(()) Ok(())
} }