From 8efaa64ca3010785ff608a7dcb729600cfc548ed Mon Sep 17 00:00:00 2001 From: uran0sH Date: Fri, 14 Apr 2023 04:19:04 -0400 Subject: [PATCH] vsock: make TX buffer size configurable That buffer is used to store bytes coming from the guest before sending them to the Unix domain socket. Some use cases might want to increase or decrease this space, so it would be best to make it user-customizable. Users can use "--tx-buffer-size=" to configure TX buffer. Fixes: #319 Signed-off-by: uran0sH --- crates/vsock/README.md | 5 ++ crates/vsock/src/main.rs | 19 +++++-- crates/vsock/src/thread_backend.rs | 9 +++- crates/vsock/src/txbuf.rs | 31 ++++++----- crates/vsock/src/vhu_vsock.rs | 18 +++++-- crates/vsock/src/vhu_vsock_thread.rs | 33 +++++++----- crates/vsock/src/vsock_conn.rs | 80 ++++++++++++++++++++++------ 7 files changed, 145 insertions(+), 50 deletions(-) diff --git a/crates/vsock/README.md b/crates/vsock/README.md index f3e8cd8..722ad24 100644 --- a/crates/vsock/README.md +++ b/crates/vsock/README.md @@ -43,6 +43,7 @@ Run the vhost-user-vsock device: vhost-user-vsock --guest-cid= \ --socket= --uds-path= + [--tx-buffer-size=host packets)>] ``` Run VMM (e.g. QEMU): @@ -61,6 +62,10 @@ qemu-system-x86_64 \ ```sh shell1$ vhost-user-vsock --guest-cid=4 --uds-path=/tmp/vm4.vsock --socket=/tmp/vhost4.socket ``` +or if you want to configure the TX buffer size +```sh +shell1$ vhost-user-vsock --guest-cid=4 --uds-path=/tmp/vm4.vsock --socket=/tmp/vhost4.socket --tx-buffer-size=65536 +``` ```sh shell2$ qemu-system-x86_64 \ diff --git a/crates/vsock/src/main.rs b/crates/vsock/src/main.rs index 0f192f9..41c7ef4 100644 --- a/crates/vsock/src/main.rs +++ b/crates/vsock/src/main.rs @@ -32,6 +32,10 @@ struct VsockArgs { /// Unix socket to which a host-side application connects to. #[clap(long)] uds_path: String, + + /// The size of the buffer used for the TX virtqueue + #[clap(long, default_value_t = 64 * 1024)] + tx_buffer_size: u32, } impl TryFrom for VsockConfig { @@ -41,7 +45,12 @@ impl TryFrom for VsockConfig { let socket = cmd_args.socket.trim().to_string(); let uds_path = cmd_args.uds_path.trim().to_string(); - Ok(VsockConfig::new(cmd_args.guest_cid, socket, uds_path)) + Ok(VsockConfig::new( + cmd_args.guest_cid, + socket, + uds_path, + cmd_args.tx_buffer_size, + )) } } @@ -101,11 +110,12 @@ mod tests { use serial_test::serial; impl VsockArgs { - fn from_args(guest_cid: u64, socket: &str, uds_path: &str) -> Self { + fn from_args(guest_cid: u64, socket: &str, uds_path: &str, tx_buffer_size: u32) -> Self { VsockArgs { guest_cid, socket: socket.to_string(), uds_path: uds_path.to_string(), + tx_buffer_size, } } } @@ -113,7 +123,7 @@ mod tests { #[test] #[serial] fn test_vsock_config_setup() { - let args = VsockArgs::from_args(3, "/tmp/vhost4.socket", "/tmp/vm4.vsock"); + let args = VsockArgs::from_args(3, "/tmp/vhost4.socket", "/tmp/vm4.vsock", 64 * 1024); let config = VsockConfig::try_from(args); assert!(config.is_ok()); @@ -122,6 +132,7 @@ mod tests { assert_eq!(config.get_guest_cid(), 3); assert_eq!(config.get_socket_path(), "/tmp/vhost4.socket"); assert_eq!(config.get_uds_path(), "/tmp/vm4.vsock"); + assert_eq!(config.get_tx_buffer_size(), 64 * 1024); } #[test] @@ -130,11 +141,13 @@ mod tests { const CID: u64 = 3; const VHOST_SOCKET_PATH: &str = "test_vsock_server.socket"; const VSOCK_SOCKET_PATH: &str = "test_vsock_server.vsock"; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; let config = VsockConfig::new( CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = Arc::new(VhostUserVsockBackend::new(config).unwrap()); diff --git a/crates/vsock/src/thread_backend.rs b/crates/vsock/src/thread_backend.rs index f32df28..d101047 100644 --- a/crates/vsock/src/thread_backend.rs +++ b/crates/vsock/src/thread_backend.rs @@ -37,11 +37,12 @@ pub(crate) struct VsockThreadBackend { epoll_fd: i32, /// Set of allocated local ports. pub local_port_set: HashSet, + tx_buffer_size: u32, } impl VsockThreadBackend { /// New instance of VsockThreadBackend. - pub fn new(host_socket_path: String, epoll_fd: i32) -> Self { + pub fn new(host_socket_path: String, epoll_fd: i32, tx_buffer_size: u32) -> Self { Self { listener_map: HashMap::new(), conn_map: HashMap::new(), @@ -52,6 +53,7 @@ impl VsockThreadBackend { host_socket_path, epoll_fd, local_port_set: HashSet::new(), + tx_buffer_size, } } @@ -216,6 +218,7 @@ impl VsockThreadBackend { pkt.src_port(), self.epoll_fd, pkt.buf_alloc(), + self.tx_buffer_size, ); self.conn_map @@ -254,6 +257,7 @@ mod tests { use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; const DATA_LEN: usize = 16; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; #[test] #[serial] @@ -266,7 +270,8 @@ mod tests { let _listener = UnixListener::bind(VSOCK_PEER_PATH).unwrap(); let epoll_fd = epoll::create(false).unwrap(); - let mut vtp = VsockThreadBackend::new(VSOCK_SOCKET_PATH.to_string(), epoll_fd); + let mut vtp = + VsockThreadBackend::new(VSOCK_SOCKET_PATH.to_string(), epoll_fd, CONN_TX_BUF_SIZE); assert!(!vtp.pending_rx()); diff --git a/crates/vsock/src/txbuf.rs b/crates/vsock/src/txbuf.rs index ff55cd8..ef718d7 100644 --- a/crates/vsock/src/txbuf.rs +++ b/crates/vsock/src/txbuf.rs @@ -4,7 +4,7 @@ use std::{io::Write, num::Wrapping}; use vm_memory::{bitmap::BitmapSlice, VolatileSlice}; -use crate::vhu_vsock::{Error, Result, CONN_TX_BUF_SIZE}; +use crate::vhu_vsock::{Error, Result}; #[derive(Debug)] pub(crate) struct LocalTxBuf { @@ -18,14 +18,19 @@ pub(crate) struct LocalTxBuf { impl LocalTxBuf { /// Create a new instance of LocalTxBuf. - pub fn new() -> Self { + pub fn new(buf_size: u32) -> Self { Self { - buf: vec![0; CONN_TX_BUF_SIZE as usize], + buf: vec![0; buf_size as usize], head: Wrapping(0), tail: Wrapping(0), } } + /// Get the buffer size + pub fn get_buf_size(&self) -> u32 { + self.buf.len() as u32 + } + /// Check if the buf is empty. pub fn is_empty(&self) -> bool { self.len() == 0 @@ -34,16 +39,16 @@ impl LocalTxBuf { /// Add new data to the tx buffer, push all or none. /// Returns LocalTxBufFull error if space not sufficient. pub fn push(&mut self, data_buf: &VolatileSlice) -> Result<()> { - if CONN_TX_BUF_SIZE as usize - self.len() < data_buf.len() { + if self.get_buf_size() as usize - self.len() < data_buf.len() { // Tx buffer is full return Err(Error::LocalTxBufFull); } // Get index into buffer at which data can be inserted - let tail_idx = self.tail.0 as usize % CONN_TX_BUF_SIZE as usize; + let tail_idx = self.tail.0 as usize % self.get_buf_size() as usize; // Check if we can fit the data buffer between head and end of buffer - let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - tail_idx, data_buf.len()); + let len = std::cmp::min(self.get_buf_size() as usize - tail_idx, data_buf.len()); let txbuf = &mut self.buf[tail_idx..tail_idx + len]; data_buf.copy_to(txbuf); @@ -67,10 +72,10 @@ impl LocalTxBuf { } // Get index into buffer from which data can be read - let head_idx = self.head.0 as usize % CONN_TX_BUF_SIZE as usize; + let head_idx = self.head.0 as usize % self.get_buf_size() as usize; // First write from head to end of buffer - let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - head_idx, self.len()); + let len = std::cmp::min(self.get_buf_size() as usize - head_idx, self.len()); let written = stream .write(&self.buf[head_idx..(head_idx + len)]) .map_err(Error::LocalTxBufFlush)?; @@ -97,9 +102,11 @@ impl LocalTxBuf { mod tests { use super::*; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + #[test] fn test_txbuf_len() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // Zero length tx buf assert_eq!(loc_tx_buf.len(), 0); @@ -118,7 +125,7 @@ mod tests { #[test] fn test_txbuf_is_empty() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // empty tx buffer assert!(loc_tx_buf.is_empty()); @@ -130,7 +137,7 @@ mod tests { #[test] fn test_txbuf_push() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); let mut buf = [0; CONN_TX_BUF_SIZE as usize]; // SAFETY: Safe as the buffer is guaranteed to be valid here. let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; @@ -168,7 +175,7 @@ mod tests { #[test] fn test_txbuf_flush_to() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // data to be flushed let mut buf = vec![1; CONN_TX_BUF_SIZE as usize]; diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index 41eea0c..72033f8 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -34,10 +34,6 @@ const EVT_QUEUE_EVENT: u16 = 2; /// Notification coming from the backend. pub(crate) const BACKEND_EVENT: u16 = 3; -/// Vsock connection TX buffer capacity -/// TODO: Make this value configurable -pub(crate) const CONN_TX_BUF_SIZE: u32 = 64 * 1024; - /// CID of the host pub(crate) const VSOCK_HOST_CID: u64 = 2; @@ -141,16 +137,18 @@ pub(crate) struct VsockConfig { guest_cid: u64, socket: String, uds_path: String, + tx_buffer_size: u32, } impl VsockConfig { /// Create a new instance of the VsockConfig struct, containing the /// parameters to be fed into the vsock-backend server. - pub fn new(guest_cid: u64, socket: String, uds_path: String) -> Self { + pub fn new(guest_cid: u64, socket: String, uds_path: String, tx_buffer_size: u32) -> Self { Self { guest_cid, socket, uds_path, + tx_buffer_size, } } @@ -170,6 +168,10 @@ impl VsockConfig { pub fn get_socket_path(&self) -> String { String::from(&self.socket) } + + pub fn get_tx_buffer_size(&self) -> u32 { + self.tx_buffer_size + } } /// A local port and peer port pair used to retrieve @@ -212,6 +214,7 @@ impl VhostUserVsockBackend { let thread = Mutex::new(VhostUserVsockThread::new( config.get_uds_path(), config.get_guest_cid(), + config.get_tx_buffer_size(), )?); let queues_per_thread = vec![QUEUE_MASK]; @@ -328,6 +331,8 @@ mod tests { use vhost_user_backend::VringT; use vm_memory::GuestAddress; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + #[test] #[serial] fn test_vsock_backend() { @@ -339,6 +344,7 @@ mod tests { CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config); @@ -411,6 +417,7 @@ mod tests { CID, "/sys/not_allowed.socket".to_string(), "/sys/not_allowed.vsock".to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config); @@ -420,6 +427,7 @@ mod tests { CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config).unwrap(); diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index e9a95b4..ab61630 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -24,10 +24,7 @@ use vmm_sys_util::epoll::EventSet; use crate::{ rxops::*, thread_backend::*, - vhu_vsock::{ - ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, CONN_TX_BUF_SIZE, - VSOCK_HOST_CID, - }, + vhu_vsock::{ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, VSOCK_HOST_CID}, vsock_conn::*, }; @@ -56,11 +53,13 @@ pub(crate) struct VhostUserVsockThread { pool: ThreadPool, /// host side port on which application listens. local_port: Wrapping, + /// The tx buffer size + tx_buffer_size: u32, } impl VhostUserVsockThread { /// Create a new instance of VhostUserVsockThread. - pub fn new(uds_path: String, guest_cid: u64) -> Result { + pub fn new(uds_path: String, guest_cid: u64, tx_buffer_size: u32) -> Result { // TODO: better error handling, maybe add a param to force the unlink let _ = std::fs::remove_file(uds_path.clone()); let host_sock = UnixListener::bind(&uds_path) @@ -81,13 +80,14 @@ impl VhostUserVsockThread { host_listener: host_sock, vring_worker: None, epoll_file, - thread_backend: VsockThreadBackend::new(uds_path, epoll_fd), + thread_backend: VsockThreadBackend::new(uds_path, epoll_fd, tx_buffer_size), guest_cid, pool: ThreadPoolBuilder::new() .pool_size(1) .create() .map_err(Error::CreateThreadPool)?, local_port: Wrapping(0), + tx_buffer_size, }; VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?; @@ -246,6 +246,7 @@ impl VhostUserVsockThread { self.guest_cid, peer_port, self.get_epoll_fd(), + self.tx_buffer_size, ); new_conn.rx_queue.enqueue(RxOps::Request); new_conn.set_peer_port(peer_port); @@ -404,7 +405,7 @@ impl VhostUserVsockThread { let used_len = match VsockPacket::from_rx_virtq_chain( mem.deref(), &mut avail_desc, - CONN_TX_BUF_SIZE, + self.tx_buffer_size, ) { Ok(mut pkt) => { if self.thread_backend.recv_pkt(&mut pkt).is_ok() { @@ -502,7 +503,7 @@ impl VhostUserVsockThread { let pkt = match VsockPacket::from_tx_virtq_chain( mem.deref(), &mut avail_desc, - CONN_TX_BUF_SIZE, + self.tx_buffer_size, ) { Ok(pkt) => pkt, Err(e) => { @@ -588,6 +589,8 @@ mod tests { use vm_memory::GuestAddress; use vmm_sys_util::eventfd::EventFd; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + impl VhostUserVsockThread { fn get_epoll_file(&self) -> &File { &self.epoll_file @@ -597,7 +600,8 @@ mod tests { #[test] #[serial] fn test_vsock_thread() { - let t = VhostUserVsockThread::new("test_vsock_thread.vsock".to_string(), 3); + let t = + VhostUserVsockThread::new("test_vsock_thread.vsock".to_string(), 3, CONN_TX_BUF_SIZE); assert!(t.is_ok()); let mut t = t.unwrap(); @@ -652,11 +656,16 @@ mod tests { #[test] #[serial] fn test_vsock_thread_failures() { - let t = VhostUserVsockThread::new("/sys/not_allowed.vsock".to_string(), 3); + let t = + VhostUserVsockThread::new("/sys/not_allowed.vsock".to_string(), 3, CONN_TX_BUF_SIZE); assert!(t.is_err()); - let mut t = - VhostUserVsockThread::new("test_vsock_thread_failures.vsock".to_string(), 3).unwrap(); + let mut t = VhostUserVsockThread::new( + "test_vsock_thread_failures.vsock".to_string(), + 3, + CONN_TX_BUF_SIZE, + ) + .unwrap(); assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err()); diff --git a/crates/vsock/src/vsock_conn.rs b/crates/vsock/src/vsock_conn.rs index 8436c95..25e3335 100644 --- a/crates/vsock/src/vsock_conn.rs +++ b/crates/vsock/src/vsock_conn.rs @@ -15,7 +15,7 @@ use crate::{ rxqueue::*, txbuf::*, vhu_vsock::{ - Error, Result, CONN_TX_BUF_SIZE, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, + Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE, VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM, }, @@ -64,6 +64,7 @@ impl VsockConnection { guest_cid: u64, guest_port: u32, epoll_fd: RawFd, + tx_buffer_size: u32, ) -> Self { Self { stream, @@ -79,12 +80,13 @@ impl VsockConnection { peer_fwd_cnt: Wrapping(0), rx_cnt: Wrapping(0), epoll_fd, - tx_buf: LocalTxBuf::new(), + tx_buf: LocalTxBuf::new(tx_buffer_size), } } /// Create a new vsock connection object for connections initiated by /// an application running in the guest. + #[allow(clippy::too_many_arguments)] pub fn new_peer_init( stream: S, local_cid: u64, @@ -93,6 +95,7 @@ impl VsockConnection { guest_port: u32, epoll_fd: RawFd, peer_buf_alloc: u32, + tx_buffer_size: u32, ) -> Self { let mut rx_queue = RxQueue::new(); rx_queue.enqueue(RxOps::Response); @@ -110,7 +113,7 @@ impl VsockConnection { peer_fwd_cnt: Wrapping(0), rx_cnt: Wrapping(0), epoll_fd, - tx_buf: LocalTxBuf::new(), + tx_buf: LocalTxBuf::new(tx_buffer_size), } } @@ -329,7 +332,7 @@ impl VsockConnection { .set_src_port(self.local_port) .set_dst_port(self.peer_port) .set_type(VSOCK_TYPE_STREAM) - .set_buf_alloc(CONN_TX_BUF_SIZE) + .set_buf_alloc(self.tx_buf.get_buf_size()) .set_fwd_cnt(self.fwd_cnt.0) } @@ -362,6 +365,8 @@ mod tests { GuestMemoryMmap, }; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + struct HeadParams { head_len: usize, data_len: u32, @@ -490,8 +495,15 @@ mod tests { fn test_vsock_conn_init() { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); assert!(!conn_local.connect); assert_eq!(conn_local.peer_port, 5001); @@ -506,8 +518,16 @@ mod tests { // New connection initiated by the peer/guest let dummy_file = VsockDummySocket::new(); - let mut conn_peer = - VsockConnection::new_peer_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1, 65536); + let mut conn_peer = VsockConnection::new_peer_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + 65536, + CONN_TX_BUF_SIZE, + ); assert!(!conn_peer.connect); assert_eq!(conn_peer.peer_port, 5001); @@ -524,8 +544,15 @@ mod tests { fn test_vsock_conn_credit() { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); assert_eq!(conn_local.peer_avail_credit(), 0); assert!(conn_local.need_credit_update_from_peer()); @@ -551,8 +578,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); @@ -581,8 +615,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5); @@ -671,8 +712,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);