diff --git a/proxmox-file-restore/src/qemu_helper.rs b/proxmox-file-restore/src/qemu_helper.rs index e5d23f40..7956f241 100644 --- a/proxmox-file-restore/src/qemu_helper.rs +++ b/proxmox-file-restore/src/qemu_helper.rs @@ -7,9 +7,8 @@ use std::time::{Duration, Instant}; use anyhow::{bail, format_err, Error}; use serde_json::json; -use tokio::io::AsyncBufRead; use tokio::{ - io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt}, + io::{AsyncBufReadExt, AsyncWriteExt}, time, }; @@ -124,15 +123,45 @@ async fn create_temp_initramfs(ticket: &str, debug: bool) -> Result<(File, Strin Ok((tmp_file, path)) } -async fn send_qmp_request( - stream: &mut T, - request: &str, -) -> Result { - stream.write_all(request.as_bytes()).await?; - stream.flush().await?; - let mut buf = String::new(); - let _ = stream.read_line(&mut buf).await?; - Ok(buf) +struct QMPSock { + sock: tokio::io::BufStream, + initialized: bool, +} + +impl QMPSock { + /// Creates a new QMP socket connection. No active initialization is done until actual requests + /// are made. + pub async fn new(cid: i32) -> Result { + let path = format!("{QMP_SOCKET_PREFIX}{cid}.sock"); + let sock = tokio::io::BufStream::new(tokio::net::UnixStream::connect(path).await?); + Ok(QMPSock { + sock, + initialized: false, + }) + } + + /// Transparently serializes the QMP request and sends it out with `send_raw` + pub async fn send(&mut self, request: serde_json::Value) -> Result { + self.send_raw(&serde_json::to_string(&request)?).await + } + + /// Send out raw (already serialized) QMP requests, handling initialization transparently + pub async fn send_raw(&mut self, request: &str) -> Result { + if !self.initialized { + let _ = self.sock.read_line(&mut String::new()).await?; // initial qmp message + self.do_send("{\"execute\":\"qmp_capabilities\"}\n").await?; + self.initialized = true; + } + self.do_send(request).await + } + + async fn do_send(&mut self, request: &str) -> Result { + self.sock.write_all(request.as_bytes()).await?; + self.sock.flush().await?; + let mut buf = String::new(); + let _ = self.sock.read_line(&mut buf).await?; + Ok(buf) + } } pub(crate) async fn hotplug_memory(cid: i32, dimm_mb: usize) -> Result<(), Error> { @@ -140,30 +169,27 @@ pub(crate) async fn hotplug_memory(cid: i32, dimm_mb: usize) -> Result<(), Error bail!("cannot set to {dimm_mb}M, maximum is {MAX_MEMORY_DIMM_SIZE}M"); } - let path = format!("{QMP_SOCKET_PREFIX}{cid}.sock"); - let mut stream = tokio::io::BufStream::new(tokio::net::UnixStream::connect(path).await?); + let mut qmp = QMPSock::new(cid).await?; - let _ = stream.read_line(&mut String::new()).await?; // initial qmp message - let _ = send_qmp_request(&mut stream, "{\"execute\":\"qmp_capabilities\"}\n").await?; - - let request = json!({ + qmp.send(json!({ "execute": "object-add", "arguments": { "qom-type": "memory-backend-ram", "id": "mem0", "size": dimm_mb * 1024 * 1024, } - }); - let _ = send_qmp_request(&mut stream, &serde_json::to_string(&request)?).await?; - let request = json!({ + })) + .await?; + + qmp.send(json!({ "execute": "device_add", "arguments": { "driver": "pc-dimm", "id": "dimm0", "memdev": "mem0", } - }); - let _ = send_qmp_request(&mut stream, &serde_json::to_string(&request)?).await?; + })) + .await?; Ok(()) }