diff --git a/crates/vsock/README.md b/crates/vsock/README.md index f3e8cd8..722ad24 100644 --- a/crates/vsock/README.md +++ b/crates/vsock/README.md @@ -43,6 +43,7 @@ Run the vhost-user-vsock device: vhost-user-vsock --guest-cid= \ --socket= --uds-path= + [--tx-buffer-size=host packets)>] ``` Run VMM (e.g. QEMU): @@ -61,6 +62,10 @@ qemu-system-x86_64 \ ```sh shell1$ vhost-user-vsock --guest-cid=4 --uds-path=/tmp/vm4.vsock --socket=/tmp/vhost4.socket ``` +or if you want to configure the TX buffer size +```sh +shell1$ vhost-user-vsock --guest-cid=4 --uds-path=/tmp/vm4.vsock --socket=/tmp/vhost4.socket --tx-buffer-size=65536 +``` ```sh shell2$ qemu-system-x86_64 \ diff --git a/crates/vsock/src/main.rs b/crates/vsock/src/main.rs index 0f192f9..41c7ef4 100644 --- a/crates/vsock/src/main.rs +++ b/crates/vsock/src/main.rs @@ -32,6 +32,10 @@ struct VsockArgs { /// Unix socket to which a host-side application connects to. #[clap(long)] uds_path: String, + + /// The size of the buffer used for the TX virtqueue + #[clap(long, default_value_t = 64 * 1024)] + tx_buffer_size: u32, } impl TryFrom for VsockConfig { @@ -41,7 +45,12 @@ impl TryFrom for VsockConfig { let socket = cmd_args.socket.trim().to_string(); let uds_path = cmd_args.uds_path.trim().to_string(); - Ok(VsockConfig::new(cmd_args.guest_cid, socket, uds_path)) + Ok(VsockConfig::new( + cmd_args.guest_cid, + socket, + uds_path, + cmd_args.tx_buffer_size, + )) } } @@ -101,11 +110,12 @@ mod tests { use serial_test::serial; impl VsockArgs { - fn from_args(guest_cid: u64, socket: &str, uds_path: &str) -> Self { + fn from_args(guest_cid: u64, socket: &str, uds_path: &str, tx_buffer_size: u32) -> Self { VsockArgs { guest_cid, socket: socket.to_string(), uds_path: uds_path.to_string(), + tx_buffer_size, } } } @@ -113,7 +123,7 @@ mod tests { #[test] #[serial] fn test_vsock_config_setup() { - let args = VsockArgs::from_args(3, "/tmp/vhost4.socket", "/tmp/vm4.vsock"); + let args = VsockArgs::from_args(3, "/tmp/vhost4.socket", "/tmp/vm4.vsock", 64 * 1024); let config = VsockConfig::try_from(args); assert!(config.is_ok()); @@ -122,6 +132,7 @@ mod tests { assert_eq!(config.get_guest_cid(), 3); assert_eq!(config.get_socket_path(), "/tmp/vhost4.socket"); assert_eq!(config.get_uds_path(), "/tmp/vm4.vsock"); + assert_eq!(config.get_tx_buffer_size(), 64 * 1024); } #[test] @@ -130,11 +141,13 @@ mod tests { const CID: u64 = 3; const VHOST_SOCKET_PATH: &str = "test_vsock_server.socket"; const VSOCK_SOCKET_PATH: &str = "test_vsock_server.vsock"; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; let config = VsockConfig::new( CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = Arc::new(VhostUserVsockBackend::new(config).unwrap()); diff --git a/crates/vsock/src/thread_backend.rs b/crates/vsock/src/thread_backend.rs index f32df28..d101047 100644 --- a/crates/vsock/src/thread_backend.rs +++ b/crates/vsock/src/thread_backend.rs @@ -37,11 +37,12 @@ pub(crate) struct VsockThreadBackend { epoll_fd: i32, /// Set of allocated local ports. pub local_port_set: HashSet, + tx_buffer_size: u32, } impl VsockThreadBackend { /// New instance of VsockThreadBackend. - pub fn new(host_socket_path: String, epoll_fd: i32) -> Self { + pub fn new(host_socket_path: String, epoll_fd: i32, tx_buffer_size: u32) -> Self { Self { listener_map: HashMap::new(), conn_map: HashMap::new(), @@ -52,6 +53,7 @@ impl VsockThreadBackend { host_socket_path, epoll_fd, local_port_set: HashSet::new(), + tx_buffer_size, } } @@ -216,6 +218,7 @@ impl VsockThreadBackend { pkt.src_port(), self.epoll_fd, pkt.buf_alloc(), + self.tx_buffer_size, ); self.conn_map @@ -254,6 +257,7 @@ mod tests { use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; const DATA_LEN: usize = 16; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; #[test] #[serial] @@ -266,7 +270,8 @@ mod tests { let _listener = UnixListener::bind(VSOCK_PEER_PATH).unwrap(); let epoll_fd = epoll::create(false).unwrap(); - let mut vtp = VsockThreadBackend::new(VSOCK_SOCKET_PATH.to_string(), epoll_fd); + let mut vtp = + VsockThreadBackend::new(VSOCK_SOCKET_PATH.to_string(), epoll_fd, CONN_TX_BUF_SIZE); assert!(!vtp.pending_rx()); diff --git a/crates/vsock/src/txbuf.rs b/crates/vsock/src/txbuf.rs index ff55cd8..ef718d7 100644 --- a/crates/vsock/src/txbuf.rs +++ b/crates/vsock/src/txbuf.rs @@ -4,7 +4,7 @@ use std::{io::Write, num::Wrapping}; use vm_memory::{bitmap::BitmapSlice, VolatileSlice}; -use crate::vhu_vsock::{Error, Result, CONN_TX_BUF_SIZE}; +use crate::vhu_vsock::{Error, Result}; #[derive(Debug)] pub(crate) struct LocalTxBuf { @@ -18,14 +18,19 @@ pub(crate) struct LocalTxBuf { impl LocalTxBuf { /// Create a new instance of LocalTxBuf. - pub fn new() -> Self { + pub fn new(buf_size: u32) -> Self { Self { - buf: vec![0; CONN_TX_BUF_SIZE as usize], + buf: vec![0; buf_size as usize], head: Wrapping(0), tail: Wrapping(0), } } + /// Get the buffer size + pub fn get_buf_size(&self) -> u32 { + self.buf.len() as u32 + } + /// Check if the buf is empty. pub fn is_empty(&self) -> bool { self.len() == 0 @@ -34,16 +39,16 @@ impl LocalTxBuf { /// Add new data to the tx buffer, push all or none. /// Returns LocalTxBufFull error if space not sufficient. pub fn push(&mut self, data_buf: &VolatileSlice) -> Result<()> { - if CONN_TX_BUF_SIZE as usize - self.len() < data_buf.len() { + if self.get_buf_size() as usize - self.len() < data_buf.len() { // Tx buffer is full return Err(Error::LocalTxBufFull); } // Get index into buffer at which data can be inserted - let tail_idx = self.tail.0 as usize % CONN_TX_BUF_SIZE as usize; + let tail_idx = self.tail.0 as usize % self.get_buf_size() as usize; // Check if we can fit the data buffer between head and end of buffer - let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - tail_idx, data_buf.len()); + let len = std::cmp::min(self.get_buf_size() as usize - tail_idx, data_buf.len()); let txbuf = &mut self.buf[tail_idx..tail_idx + len]; data_buf.copy_to(txbuf); @@ -67,10 +72,10 @@ impl LocalTxBuf { } // Get index into buffer from which data can be read - let head_idx = self.head.0 as usize % CONN_TX_BUF_SIZE as usize; + let head_idx = self.head.0 as usize % self.get_buf_size() as usize; // First write from head to end of buffer - let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - head_idx, self.len()); + let len = std::cmp::min(self.get_buf_size() as usize - head_idx, self.len()); let written = stream .write(&self.buf[head_idx..(head_idx + len)]) .map_err(Error::LocalTxBufFlush)?; @@ -97,9 +102,11 @@ impl LocalTxBuf { mod tests { use super::*; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + #[test] fn test_txbuf_len() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // Zero length tx buf assert_eq!(loc_tx_buf.len(), 0); @@ -118,7 +125,7 @@ mod tests { #[test] fn test_txbuf_is_empty() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // empty tx buffer assert!(loc_tx_buf.is_empty()); @@ -130,7 +137,7 @@ mod tests { #[test] fn test_txbuf_push() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); let mut buf = [0; CONN_TX_BUF_SIZE as usize]; // SAFETY: Safe as the buffer is guaranteed to be valid here. let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; @@ -168,7 +175,7 @@ mod tests { #[test] fn test_txbuf_flush_to() { - let mut loc_tx_buf = LocalTxBuf::new(); + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); // data to be flushed let mut buf = vec![1; CONN_TX_BUF_SIZE as usize]; diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index 41eea0c..72033f8 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -34,10 +34,6 @@ const EVT_QUEUE_EVENT: u16 = 2; /// Notification coming from the backend. pub(crate) const BACKEND_EVENT: u16 = 3; -/// Vsock connection TX buffer capacity -/// TODO: Make this value configurable -pub(crate) const CONN_TX_BUF_SIZE: u32 = 64 * 1024; - /// CID of the host pub(crate) const VSOCK_HOST_CID: u64 = 2; @@ -141,16 +137,18 @@ pub(crate) struct VsockConfig { guest_cid: u64, socket: String, uds_path: String, + tx_buffer_size: u32, } impl VsockConfig { /// Create a new instance of the VsockConfig struct, containing the /// parameters to be fed into the vsock-backend server. - pub fn new(guest_cid: u64, socket: String, uds_path: String) -> Self { + pub fn new(guest_cid: u64, socket: String, uds_path: String, tx_buffer_size: u32) -> Self { Self { guest_cid, socket, uds_path, + tx_buffer_size, } } @@ -170,6 +168,10 @@ impl VsockConfig { pub fn get_socket_path(&self) -> String { String::from(&self.socket) } + + pub fn get_tx_buffer_size(&self) -> u32 { + self.tx_buffer_size + } } /// A local port and peer port pair used to retrieve @@ -212,6 +214,7 @@ impl VhostUserVsockBackend { let thread = Mutex::new(VhostUserVsockThread::new( config.get_uds_path(), config.get_guest_cid(), + config.get_tx_buffer_size(), )?); let queues_per_thread = vec![QUEUE_MASK]; @@ -328,6 +331,8 @@ mod tests { use vhost_user_backend::VringT; use vm_memory::GuestAddress; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + #[test] #[serial] fn test_vsock_backend() { @@ -339,6 +344,7 @@ mod tests { CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config); @@ -411,6 +417,7 @@ mod tests { CID, "/sys/not_allowed.socket".to_string(), "/sys/not_allowed.vsock".to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config); @@ -420,6 +427,7 @@ mod tests { CID, VHOST_SOCKET_PATH.to_string(), VSOCK_SOCKET_PATH.to_string(), + CONN_TX_BUF_SIZE, ); let backend = VhostUserVsockBackend::new(config).unwrap(); diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index e9a95b4..ab61630 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -24,10 +24,7 @@ use vmm_sys_util::epoll::EventSet; use crate::{ rxops::*, thread_backend::*, - vhu_vsock::{ - ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, CONN_TX_BUF_SIZE, - VSOCK_HOST_CID, - }, + vhu_vsock::{ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, VSOCK_HOST_CID}, vsock_conn::*, }; @@ -56,11 +53,13 @@ pub(crate) struct VhostUserVsockThread { pool: ThreadPool, /// host side port on which application listens. local_port: Wrapping, + /// The tx buffer size + tx_buffer_size: u32, } impl VhostUserVsockThread { /// Create a new instance of VhostUserVsockThread. - pub fn new(uds_path: String, guest_cid: u64) -> Result { + pub fn new(uds_path: String, guest_cid: u64, tx_buffer_size: u32) -> Result { // TODO: better error handling, maybe add a param to force the unlink let _ = std::fs::remove_file(uds_path.clone()); let host_sock = UnixListener::bind(&uds_path) @@ -81,13 +80,14 @@ impl VhostUserVsockThread { host_listener: host_sock, vring_worker: None, epoll_file, - thread_backend: VsockThreadBackend::new(uds_path, epoll_fd), + thread_backend: VsockThreadBackend::new(uds_path, epoll_fd, tx_buffer_size), guest_cid, pool: ThreadPoolBuilder::new() .pool_size(1) .create() .map_err(Error::CreateThreadPool)?, local_port: Wrapping(0), + tx_buffer_size, }; VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?; @@ -246,6 +246,7 @@ impl VhostUserVsockThread { self.guest_cid, peer_port, self.get_epoll_fd(), + self.tx_buffer_size, ); new_conn.rx_queue.enqueue(RxOps::Request); new_conn.set_peer_port(peer_port); @@ -404,7 +405,7 @@ impl VhostUserVsockThread { let used_len = match VsockPacket::from_rx_virtq_chain( mem.deref(), &mut avail_desc, - CONN_TX_BUF_SIZE, + self.tx_buffer_size, ) { Ok(mut pkt) => { if self.thread_backend.recv_pkt(&mut pkt).is_ok() { @@ -502,7 +503,7 @@ impl VhostUserVsockThread { let pkt = match VsockPacket::from_tx_virtq_chain( mem.deref(), &mut avail_desc, - CONN_TX_BUF_SIZE, + self.tx_buffer_size, ) { Ok(pkt) => pkt, Err(e) => { @@ -588,6 +589,8 @@ mod tests { use vm_memory::GuestAddress; use vmm_sys_util::eventfd::EventFd; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + impl VhostUserVsockThread { fn get_epoll_file(&self) -> &File { &self.epoll_file @@ -597,7 +600,8 @@ mod tests { #[test] #[serial] fn test_vsock_thread() { - let t = VhostUserVsockThread::new("test_vsock_thread.vsock".to_string(), 3); + let t = + VhostUserVsockThread::new("test_vsock_thread.vsock".to_string(), 3, CONN_TX_BUF_SIZE); assert!(t.is_ok()); let mut t = t.unwrap(); @@ -652,11 +656,16 @@ mod tests { #[test] #[serial] fn test_vsock_thread_failures() { - let t = VhostUserVsockThread::new("/sys/not_allowed.vsock".to_string(), 3); + let t = + VhostUserVsockThread::new("/sys/not_allowed.vsock".to_string(), 3, CONN_TX_BUF_SIZE); assert!(t.is_err()); - let mut t = - VhostUserVsockThread::new("test_vsock_thread_failures.vsock".to_string(), 3).unwrap(); + let mut t = VhostUserVsockThread::new( + "test_vsock_thread_failures.vsock".to_string(), + 3, + CONN_TX_BUF_SIZE, + ) + .unwrap(); assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err()); diff --git a/crates/vsock/src/vsock_conn.rs b/crates/vsock/src/vsock_conn.rs index 8436c95..25e3335 100644 --- a/crates/vsock/src/vsock_conn.rs +++ b/crates/vsock/src/vsock_conn.rs @@ -15,7 +15,7 @@ use crate::{ rxqueue::*, txbuf::*, vhu_vsock::{ - Error, Result, CONN_TX_BUF_SIZE, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, + Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE, VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM, }, @@ -64,6 +64,7 @@ impl VsockConnection { guest_cid: u64, guest_port: u32, epoll_fd: RawFd, + tx_buffer_size: u32, ) -> Self { Self { stream, @@ -79,12 +80,13 @@ impl VsockConnection { peer_fwd_cnt: Wrapping(0), rx_cnt: Wrapping(0), epoll_fd, - tx_buf: LocalTxBuf::new(), + tx_buf: LocalTxBuf::new(tx_buffer_size), } } /// Create a new vsock connection object for connections initiated by /// an application running in the guest. + #[allow(clippy::too_many_arguments)] pub fn new_peer_init( stream: S, local_cid: u64, @@ -93,6 +95,7 @@ impl VsockConnection { guest_port: u32, epoll_fd: RawFd, peer_buf_alloc: u32, + tx_buffer_size: u32, ) -> Self { let mut rx_queue = RxQueue::new(); rx_queue.enqueue(RxOps::Response); @@ -110,7 +113,7 @@ impl VsockConnection { peer_fwd_cnt: Wrapping(0), rx_cnt: Wrapping(0), epoll_fd, - tx_buf: LocalTxBuf::new(), + tx_buf: LocalTxBuf::new(tx_buffer_size), } } @@ -329,7 +332,7 @@ impl VsockConnection { .set_src_port(self.local_port) .set_dst_port(self.peer_port) .set_type(VSOCK_TYPE_STREAM) - .set_buf_alloc(CONN_TX_BUF_SIZE) + .set_buf_alloc(self.tx_buf.get_buf_size()) .set_fwd_cnt(self.fwd_cnt.0) } @@ -362,6 +365,8 @@ mod tests { GuestMemoryMmap, }; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + struct HeadParams { head_len: usize, data_len: u32, @@ -490,8 +495,15 @@ mod tests { fn test_vsock_conn_init() { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); assert!(!conn_local.connect); assert_eq!(conn_local.peer_port, 5001); @@ -506,8 +518,16 @@ mod tests { // New connection initiated by the peer/guest let dummy_file = VsockDummySocket::new(); - let mut conn_peer = - VsockConnection::new_peer_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1, 65536); + let mut conn_peer = VsockConnection::new_peer_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + 65536, + CONN_TX_BUF_SIZE, + ); assert!(!conn_peer.connect); assert_eq!(conn_peer.peer_port, 5001); @@ -524,8 +544,15 @@ mod tests { fn test_vsock_conn_credit() { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); assert_eq!(conn_local.peer_avail_credit(), 0); assert!(conn_local.need_credit_update_from_peer()); @@ -551,8 +578,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); @@ -581,8 +615,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5); @@ -671,8 +712,15 @@ mod tests { // new locally inititated connection let dummy_file = VsockDummySocket::new(); - let mut conn_local = - VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);