From df1073bf2e8ef616559543a55db531f81b7f2d73 Mon Sep 17 00:00:00 2001 From: Stefano Garzarella Date: Mon, 26 Sep 2022 20:20:11 +0200 Subject: [PATCH] vsock: use VsockPacket from the virtio-vsock crate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace our VsockPacket with the new one implemented in the virtio-vsock crate. Based on Alex Bennée and Laura Loghin commit: https://github.com/stsquad/vhost-device/commit/6752266608dd To achieve this the following changes where made: - deleted the internal packet.rs - convert the send_pkt/recv_pkt helpers to be BitmapSlice aware - update push from LocalTxBuf - tweak a few API calls due to minor diffs Fixed tests and moved some helpers from the removed vsock/src/packet.rs file. Co-developed-by: Laura Loghin Co-developed-by: Alex Bennée Signed-off-by: Stefano Garzarella --- Cargo.lock | 11 + vsock/Cargo.toml | 1 + vsock/src/main.rs | 1 - vsock/src/packet.rs | 592 ---------------------------------- vsock/src/thread_backend.rs | 19 +- vsock/src/txbuf.rs | 26 +- vsock/src/vhu_vsock_thread.rs | 54 ++-- vsock/src/vsock_conn.rs | 193 ++++++++--- 8 files changed, 224 insertions(+), 673 deletions(-) delete mode 100644 vsock/src/packet.rs diff --git a/Cargo.lock b/Cargo.lock index af2a32d..228d38f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,6 +683,7 @@ dependencies = [ "vhost-user-backend", "virtio-bindings", "virtio-queue", + "virtio-vsock", "vm-memory", "vmm-sys-util", ] @@ -705,6 +706,16 @@ dependencies = [ "vmm-sys-util", ] +[[package]] +name = "virtio-vsock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876299fc0f59e07aadc77541ce49dd75f7548f4d095eac6f7104b805394029e8" +dependencies = [ + "virtio-queue", + "vm-memory", +] + [[package]] name = "vm-memory" version = "0.9.0" diff --git a/vsock/Cargo.toml b/vsock/Cargo.toml index 572a35e..5e05376 100644 --- a/vsock/Cargo.toml +++ b/vsock/Cargo.toml @@ -20,6 +20,7 @@ vhost = { version = "0.5", features = ["vhost-user-slave"] } vhost-user-backend = "0.7.0" virtio-bindings = ">=0.1" virtio-queue = "0.6" +virtio-vsock = "0.1.0" vm-memory = ">=0.8" vmm-sys-util = "=0.10.0" diff --git a/vsock/src/main.rs b/vsock/src/main.rs index 8439e98..6f35f7f 100644 --- a/vsock/src/main.rs +++ b/vsock/src/main.rs @@ -1,4 +1,3 @@ -mod packet; mod rxops; mod rxqueue; mod thread_backend; diff --git a/vsock/src/packet.rs b/vsock/src/packet.rs deleted file mode 100644 index b30686a..0000000 --- a/vsock/src/packet.rs +++ /dev/null @@ -1,592 +0,0 @@ -#![deny(missing_docs)] -use byteorder::{ByteOrder, LittleEndian}; -use thiserror::Error as ThisError; -use virtio_queue::DescriptorChain; -use vm_memory::{ - GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryLoadGuard, - GuestMemoryMmap, -}; - -pub(crate) type Result = std::result::Result; - -/// Below enum defines custom error types for vsock packet operations. -#[derive(Debug, PartialEq, ThisError)] -pub(crate) enum Error { - #[error("Descriptor not writable")] - UnwritableDescriptor, - #[error("Missing descriptor in queue")] - QueueMissingDescriptor, - #[error("Small header descriptor: {0}")] - HdrDescTooSmall(u32), - #[error("Chained guest memory error")] - GuestMemory, - #[error("Descriptor not readable")] - UnreadableDescriptor, - #[error("Extra descriptors in the descriptor chain")] - ExtraDescrInChain, - #[error("Data buffer size less than size in packet header")] - DataDescTooSmall, -} - -// TODO: Replace below with bindgen generated struct -// vsock packet header size when packed -pub const VSOCK_PKT_HDR_SIZE: usize = 44; - -// Offset into header for source cid -const HDROFF_SRC_CID: usize = 0; - -// Offset into header for destination cid -const HDROFF_DST_CID: usize = 8; - -// Offset into header for source port -const HDROFF_SRC_PORT: usize = 16; - -// Offset into header for destination port -const HDROFF_DST_PORT: usize = 20; - -// Offset into the header for data length -const HDROFF_LEN: usize = 24; - -// Offset into header for packet type -const HDROFF_TYPE: usize = 28; - -// Offset into header for operation kind -const HDROFF_OP: usize = 30; - -// Offset into header for additional flags -// only for VSOCK_OP_SHUTDOWN -const HDROFF_FLAGS: usize = 32; - -// Offset into header for tx buf alloc -const HDROFF_BUF_ALLOC: usize = 36; - -// Offset into header for forward count -const HDROFF_FWD_CNT: usize = 40; - -/// Vsock packet structure implemented as a wrapper around a virtq descriptor chain: -/// - chain head holds the packet header -/// - optional data descriptor, only present for data packets (VSOCK_OP_RW) -#[derive(Debug)] -pub struct VsockPacket { - hdr: *mut u8, - buf: Option<*mut u8>, - buf_size: usize, -} - -impl VsockPacket { - /// Create a vsock packet wrapper around a chain in the rx virtqueue. - /// Perform bounds checking before creating the wrapper. - pub(crate) fn from_rx_virtq_head( - chain: &mut DescriptorChain>>, - mem: GuestMemoryAtomic, - ) -> Result { - // head is at 0, next is at 1, max of two descriptors - // head contains the packet header - // next contains the optional packet data - let mut descr_vec = Vec::with_capacity(2); - - for descr in chain { - if !descr.is_write_only() { - return Err(Error::UnwritableDescriptor); - } - - descr_vec.push(descr); - } - - if descr_vec.len() < 2 { - // We expect a head and a data descriptor - return Err(Error::QueueMissingDescriptor); - } - - let head_descr = descr_vec[0]; - let data_descr = descr_vec[1]; - - if head_descr.len() < VSOCK_PKT_HDR_SIZE as u32 { - return Err(Error::HdrDescTooSmall(head_descr.len())); - } - - Ok(Self { - hdr: VsockPacket::guest_to_host_address( - &mem.memory(), - head_descr.addr(), - VSOCK_PKT_HDR_SIZE, - ) - .ok_or(Error::GuestMemory)? as *mut u8, - buf: Some( - VsockPacket::guest_to_host_address( - &mem.memory(), - data_descr.addr(), - data_descr.len() as usize, - ) - .ok_or(Error::GuestMemory)? as *mut u8, - ), - buf_size: data_descr.len() as usize, - }) - } - - /// Create a vsock packet wrapper around a chain in the tx virtqueue - /// Bounds checking before creating the wrapper. - pub(crate) fn from_tx_virtq_head( - chain: &mut DescriptorChain>>, - mem: GuestMemoryAtomic, - ) -> Result { - // head is at 0, next is at 1, max of two descriptors - // head contains the packet header - // next contains the optional packet data - let mut descr_vec = Vec::with_capacity(2); - // let mut num_descr = 0; - - for descr in chain { - if descr.is_write_only() { - return Err(Error::UnreadableDescriptor); - } - - descr_vec.push(descr); - } - - if descr_vec.len() > 2 { - return Err(Error::ExtraDescrInChain); - } - - let head_descr = descr_vec[0]; - - if head_descr.len() < VSOCK_PKT_HDR_SIZE as u32 { - return Err(Error::HdrDescTooSmall(head_descr.len())); - } - - let mut pkt = Self { - hdr: VsockPacket::guest_to_host_address( - &mem.memory(), - head_descr.addr(), - VSOCK_PKT_HDR_SIZE, - ) - .ok_or(Error::GuestMemory)? as *mut u8, - buf: None, - buf_size: 0, - }; - - // Zero length packet - if pkt.is_empty() { - return Ok(pkt); - } - - // There exists packet data as well - let data_descr = descr_vec[1]; - - // Data buffer should be as large as described in the header - if data_descr.len() < pkt.len() { - return Err(Error::DataDescTooSmall); - } - - pkt.buf_size = data_descr.len() as usize; - pkt.buf = Some( - VsockPacket::guest_to_host_address( - &mem.memory(), - data_descr.addr(), - data_descr.len() as usize, - ) - .ok_or(Error::GuestMemory)? as *mut u8, - ); - - Ok(pkt) - } - - /// Convert an absolute address in guest address space to a host - /// pointer and verify that the provided size defines a valid - /// range within a single memory region. - fn guest_to_host_address( - mem: &GuestMemoryLoadGuard, - addr: GuestAddress, - size: usize, - ) -> Option<*mut u8> { - if mem.check_range(addr, size) { - Some(mem.get_host_address(addr).unwrap()) - } else { - None - } - } - - /// In place byte slice access to vsock packet header. - pub fn hdr(&self) -> &[u8] { - // Safe as bound checks performed in from_*_virtq_head - unsafe { std::slice::from_raw_parts(self.hdr as *const u8, VSOCK_PKT_HDR_SIZE) } - } - - /// In place mutable slice access to vsock packet header. - pub fn hdr_mut(&mut self) -> &mut [u8] { - // Safe as bound checks performed in from_*_virtq_head - unsafe { std::slice::from_raw_parts_mut(self.hdr, VSOCK_PKT_HDR_SIZE) } - } - - /// Size of vsock packet data, found by accessing len field - /// of virtio_vsock_hdr struct. - pub fn len(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_LEN..]) - } - - /// Set the source cid. - pub fn set_src_cid(&mut self, cid: u64) -> &mut Self { - LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_SRC_CID..], cid); - self - } - - /// Set the destination cid. - pub fn set_dst_cid(&mut self, cid: u64) -> &mut Self { - LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_DST_CID..], cid); - self - } - - /// Set source port. - pub fn set_src_port(&mut self, port: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_SRC_PORT..], port); - self - } - - /// Set destination port. - pub fn set_dst_port(&mut self, port: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_DST_PORT..], port); - self - } - - /// Set type of connection. - pub fn set_type(&mut self, type_: u16) -> &mut Self { - LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_TYPE..], type_); - self - } - - /// Set size of tx buf. - pub fn set_buf_alloc(&mut self, buf_alloc: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_BUF_ALLOC..], buf_alloc); - self - } - - /// Set amount of tx buf data written to stream. - pub fn set_fwd_cnt(&mut self, fwd_cnt: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FWD_CNT..], fwd_cnt); - self - } - - /// Set packet operation ID. - pub fn set_op(&mut self, op: u16) -> &mut Self { - LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_OP..], op); - self - } - - /// Check if the packet has no data. - fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Get destination port from packet. - pub fn dst_port(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_DST_PORT..]) - } - - /// Get source port from packet. - pub fn src_port(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_SRC_PORT..]) - } - - /// Get source cid from packet. - pub fn src_cid(&self) -> u64 { - LittleEndian::read_u64(&self.hdr()[HDROFF_SRC_CID..]) - } - - /// Get destination cid from packet. - pub fn dst_cid(&self) -> u64 { - LittleEndian::read_u64(&self.hdr()[HDROFF_DST_CID..]) - } - - /// Get packet type. - pub fn pkt_type(&self) -> u16 { - LittleEndian::read_u16(&self.hdr()[HDROFF_TYPE..]) - } - - /// Get operation requested in the packet. - pub fn op(&self) -> u16 { - LittleEndian::read_u16(&self.hdr()[HDROFF_OP..]) - } - - /// Byte slice mutable access to vsock packet data buffer. - pub fn buf_mut(&mut self) -> Option<&mut [u8]> { - // Safe as bound checks performed while creating packet - self.buf - .map(|ptr| unsafe { std::slice::from_raw_parts_mut(ptr, self.buf_size) }) - } - - /// Byte slice access to vsock packet data buffer. - pub fn buf(&self) -> Option<&[u8]> { - // Safe as bound checks performed while creating packet - self.buf - .map(|ptr| unsafe { std::slice::from_raw_parts(ptr as *const u8, self.buf_size) }) - } - - /// Set data buffer length. - pub fn set_len(&mut self, len: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_LEN..], len); - self - } - - /// Read buf alloc. - pub fn buf_alloc(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_BUF_ALLOC..]) - } - - /// Get fwd cnt from packet header. - pub fn fwd_cnt(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_FWD_CNT..]) - } - - /// Read flags from the packet header. - pub fn flags(&self) -> u32 { - LittleEndian::read_u32(&self.hdr()[HDROFF_FLAGS..]) - } - - /// Set packet header flag to flags. - pub fn set_flags(&mut self, flags: u32) -> &mut Self { - LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FLAGS..], flags); - self - } - - /// Set OP specific flags. - pub fn set_flag(&mut self, flag: u32) -> &mut Self { - self.set_flags(self.flags() | flag); - self - } -} - -#[cfg(test)] -pub mod tests { - use crate::vhu_vsock::{VSOCK_OP_RW, VSOCK_TYPE_STREAM}; - - use super::*; - use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE}; - use virtio_queue::{mock::MockSplitQueue, Descriptor, Queue, QueueOwnedT}; - use vm_memory::{Address, Bytes, GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; - - pub struct HeadParams { - head_len: usize, - data_len: u32, - } - - impl HeadParams { - pub fn new(head_len: usize, data_len: u32) -> Self { - Self { head_len, data_len } - } - fn construct_head(&self) -> Vec { - let mut header = vec![0_u8; self.head_len]; - if self.head_len == VSOCK_PKT_HDR_SIZE { - LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len); - } - header - } - } - - pub fn prepare_desc_chain_vsock( - write_only: bool, - head_params: &HeadParams, - data_chain_len: u16, - head_data_len: u32, - ) -> ( - GuestMemoryAtomic, - DescriptorChain>, - ) { - let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); - let virt_queue = MockSplitQueue::new(&mem, 16); - let mut next_addr = virt_queue.desc_table().total_size() + 0x100; - let mut flags = 0; - - if write_only { - flags |= VRING_DESC_F_WRITE; - } - - let mut head_flags = if data_chain_len > 0 { - flags | VRING_DESC_F_NEXT - } else { - flags - }; - - // vsock packet header - // let header = vec![0 as u8; head_params.head_len]; - let header = head_params.construct_head(); - let head_desc = - Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1); - mem.write(&header, head_desc.addr()).unwrap(); - assert!(virt_queue.desc_table().store(0, head_desc).is_ok()); - next_addr += head_params.head_len as u64; - - // Put the descriptor index 0 in the first available ring position. - mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4)) - .unwrap(); - - // Set `avail_idx` to 1. - mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2)) - .unwrap(); - - // chain len excludes the head - for i in 0..(data_chain_len) { - // last descr in chain - if i == data_chain_len - 1 { - head_flags &= !VRING_DESC_F_NEXT; - } - // vsock data - let data = vec![0_u8; head_data_len as usize]; - let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2); - mem.write(&data, data_desc.addr()).unwrap(); - assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok()); - next_addr += head_data_len as u64; - } - - // Create descriptor chain from pre-filled memory - ( - GuestMemoryAtomic::new(mem.clone()), - virt_queue - .create_queue::() - .unwrap() - .iter(GuestMemoryAtomic::new(mem.clone()).memory()) - .unwrap() - .next() - .unwrap(), - ) - } - - #[test] - fn test_guest_to_host_address() { - let mem = GuestMemoryAtomic::new( - GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 1000)]).unwrap(), - ); - assert!(VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(0), 1000).is_some()); - assert!(VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(0), 500).is_some()); - assert!( - VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(500), 500).is_some() - ); - assert!( - VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(501), 500).is_none() - ); - } - - #[test] - fn test_from_rx_virtq_head() { - // parameters for packet head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10); - - // write only descriptor chain - let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); - assert!(VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).is_ok()); - - // read only descriptor chain - let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 10); - assert_eq!( - VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::UnwritableDescriptor - ); - - // less than two descriptors - let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 0, 10); - assert_eq!( - VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::QueueMissingDescriptor - ); - - // incorrect header length - let head_params = HeadParams::new(22, 10); - let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 10); - assert_eq!( - VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::HdrDescTooSmall(22) - ); - } - - #[test] - fn test_vsock_packet_header_ops() { - // parameters for head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10); - - let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); - let mut vsock_packet = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap(); - - // Check packet data's length - assert!(!vsock_packet.is_empty()); - assert_eq!(vsock_packet.len(), 10); - - // Set and get the source CID in the packet header - vsock_packet.set_src_cid(1); - assert_eq!(vsock_packet.src_cid(), 1); - - // Set and get the destination CID in the packet header - vsock_packet.set_dst_cid(1); - assert_eq!(vsock_packet.dst_cid(), 1); - - // Set and get the source port in the packet header - vsock_packet.set_src_port(5000); - assert_eq!(vsock_packet.src_port(), 5000); - - // Set and get the destination port in the packet header - vsock_packet.set_dst_port(5000); - assert_eq!(vsock_packet.dst_port(), 5000); - - // Set and get packet type - vsock_packet.set_type(VSOCK_TYPE_STREAM); - assert_eq!(vsock_packet.pkt_type(), VSOCK_TYPE_STREAM); - - // Set and get tx buffer size - vsock_packet.set_buf_alloc(10); - assert_eq!(vsock_packet.buf_alloc(), 10); - - // Set and get fwd_cnt of packet's data - vsock_packet.set_fwd_cnt(100); - assert_eq!(vsock_packet.fwd_cnt(), 100); - - // Set and get packet operation type - vsock_packet.set_op(VSOCK_OP_RW); - assert_eq!(vsock_packet.op(), VSOCK_OP_RW); - - // Set and get length of packet's data buffer - // this is a dummy test - vsock_packet.set_len(20); - assert_eq!(vsock_packet.len(), 20); - assert!(!vsock_packet.is_empty()); - - // Set and get packet's flags - vsock_packet.set_flags(1); - assert_eq!(vsock_packet.flags(), 1); - } - - #[test] - fn test_from_tx_virtq_head() { - // parameters for head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 0); - - // read only descriptor chain no data - let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 0, 0); - assert!(VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).is_ok()); - - // parameters for head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10); - - // read only descriptor chain - let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 10); - assert!(VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).is_ok()); - - // write only descriptor chain - let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 10); - assert_eq!( - VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::UnreadableDescriptor - ); - - // more than 2 descriptors in chain - let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 2, 10); - assert_eq!( - VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::ExtraDescrInChain - ); - - // length of data descriptor does not match the value in head - let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5); - assert_eq!( - VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(), - Error::DataDescTooSmall - ); - } -} diff --git a/vsock/src/thread_backend.rs b/vsock/src/thread_backend.rs index 8351003..50c853f 100644 --- a/vsock/src/thread_backend.rs +++ b/vsock/src/thread_backend.rs @@ -1,7 +1,6 @@ #![deny(missing_docs)] use super::{ - packet::*, rxops::*, vhu_vsock::{ ConnMapKey, Error, Result, VSOCK_HOST_CID, VSOCK_OP_REQUEST, VSOCK_OP_RST, @@ -18,6 +17,8 @@ use std::{ prelude::{AsRawFd, FromRawFd, RawFd}, }, }; +use virtio_vsock::packet::VsockPacket; +use vm_memory::bitmap::BitmapSlice; // TODO: convert UnixStream to Arc> pub struct VsockThreadBackend { @@ -63,7 +64,7 @@ impl VsockThreadBackend { /// Returns: /// - `Ok(())` if the packet was successfully filled in /// - `Err(Error::EmptyBackendRxQ) if there was no available data - pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> { + pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> { // Pop an event from the backend_rxq let key = self.backend_rxq.pop_front().ok_or(Error::EmptyBackendRxQ)?; let conn = match self.conn_map.get_mut(&key) { @@ -117,11 +118,11 @@ impl VsockThreadBackend { /// /// Returns: /// - always `Ok(())` if packet has been consumed correctly - pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> { + pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> { let key = ConnMapKey::new(pkt.dst_port(), pkt.src_port()); // TODO: Rst if packet has unsupported type - if pkt.pkt_type() != VSOCK_TYPE_STREAM { + if pkt.type_() != VSOCK_TYPE_STREAM { info!("vsock: dropping packet of unknown type"); return Ok(()); } @@ -130,7 +131,7 @@ impl VsockThreadBackend { if pkt.dst_cid() != VSOCK_HOST_CID { info!( "vsock: dropping packet for cid other than host: {:?}", - pkt.hdr() + pkt.dst_cid() ); return Ok(()); @@ -186,7 +187,7 @@ impl VsockThreadBackend { /// Attempts to connect to a host side unix socket listening on a path /// corresponding to the destination port as follows: /// - "{self.host_sock_path}_{local_port}"" - fn handle_new_guest_conn(&mut self, pkt: &VsockPacket) { + fn handle_new_guest_conn(&mut self, pkt: &VsockPacket) { let port_path = format!("{}_{}", self.host_socket_path, pkt.dst_port()); UnixStream::connect(port_path) @@ -197,7 +198,11 @@ impl VsockThreadBackend { } /// Wrapper to add new connection to relevant HashMaps. - fn add_new_guest_conn(&mut self, stream: UnixStream, pkt: &VsockPacket) -> Result<()> { + fn add_new_guest_conn( + &mut self, + stream: UnixStream, + pkt: &VsockPacket, + ) -> Result<()> { let stream_fd = stream.as_raw_fd(); self.listener_map .insert(stream_fd, ConnMapKey::new(pkt.dst_port(), pkt.src_port())); diff --git a/vsock/src/txbuf.rs b/vsock/src/txbuf.rs index 012110b..40c935c 100644 --- a/vsock/src/txbuf.rs +++ b/vsock/src/txbuf.rs @@ -1,5 +1,6 @@ use super::vhu_vsock::{Error, Result, CONN_TX_BUF_SIZE}; use std::{io::Write, num::Wrapping}; +use vm_memory::{bitmap::BitmapSlice, VolatileSlice}; #[derive(Debug)] pub struct LocalTxBuf { @@ -28,7 +29,7 @@ impl LocalTxBuf { /// Add new data to the tx buffer, push all or none. /// Returns LocalTxBufFull error if space not sufficient. - pub(crate) fn push(&mut self, data_buf: &[u8]) -> Result<()> { + pub(crate) fn push(&mut self, data_buf: &VolatileSlice) -> Result<()> { if CONN_TX_BUF_SIZE as usize - self.len() < data_buf.len() { // Tx buffer is full return Err(Error::LocalTxBufFull); @@ -39,11 +40,13 @@ impl LocalTxBuf { // Check if we can fit the data buffer between head and end of buffer let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - tail_idx, data_buf.len()); - self.buf[tail_idx..tail_idx + len].copy_from_slice(&data_buf[..len]); + let txbuf = &mut self.buf[tail_idx..tail_idx + len]; + data_buf.copy_to(txbuf); // Check if there is more data to be wrapped around if len < data_buf.len() { - self.buf[..(data_buf.len() - len)].copy_from_slice(&data_buf[len..]); + let remain_txbuf = &mut self.buf[..(data_buf.len() - len)]; + data_buf.copy_to(remain_txbuf); } // Increment tail by the amount of data that has been added to the buffer @@ -124,7 +127,8 @@ mod tests { #[test] fn test_txbuf_push() { let mut loc_tx_buf = LocalTxBuf::new(); - let data = [0; CONN_TX_BUF_SIZE as usize]; + let mut buf = [0; CONN_TX_BUF_SIZE as usize]; + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; // push data into empty tx buffer let res_push = loc_tx_buf.push(&data); @@ -143,7 +147,8 @@ mod tests { assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2)); // only tail wraps at full - let data = vec![1; 4]; + let mut buf = vec![1; 4]; + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; let mut cmp_data = vec![1; 4]; cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]); loc_tx_buf.head = Wrapping(4); @@ -160,7 +165,8 @@ mod tests { let mut loc_tx_buf = LocalTxBuf::new(); // data to be flushed - let data = vec![1; CONN_TX_BUF_SIZE as usize]; + let mut buf = vec![1; CONN_TX_BUF_SIZE as usize]; + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; // target to which data is flushed let mut cmp_vec = Vec::with_capacity(data.len()); @@ -178,12 +184,14 @@ mod tests { assert_eq!(loc_tx_buf.head, Wrapping(n as u32)); assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE)); assert_eq!(n, cmp_vec.len()); - assert_eq!(cmp_vec, data[..n]); + assert_eq!(cmp_vec, buf[..n]); } // wrapping head flush - let mut data = vec![0; (CONN_TX_BUF_SIZE / 2) as usize]; - data.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]); + let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize]; + buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]); + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + loc_tx_buf.head = Wrapping(0); loc_tx_buf.tail = Wrapping(0); let res_push = loc_tx_buf.push(&data); diff --git a/vsock/src/vhu_vsock_thread.rs b/vsock/src/vhu_vsock_thread.rs index 796f6ce..0e3c59c 100644 --- a/vsock/src/vhu_vsock_thread.rs +++ b/vsock/src/vhu_vsock_thread.rs @@ -1,8 +1,10 @@ use super::{ - packet::*, rxops::*, thread_backend::*, - vhu_vsock::{ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, VSOCK_HOST_CID}, + vhu_vsock::{ + ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, CONN_TX_BUF_SIZE, + VSOCK_HOST_CID, + }, vsock_conn::*, }; use futures::executor::{ThreadPool, ThreadPoolBuilder}; @@ -12,6 +14,7 @@ use std::{ io, io::Read, num::Wrapping, + ops::Deref, os::unix::{ net::{UnixListener, UnixStream}, prelude::{AsRawFd, FromRawFd, RawFd}, @@ -20,6 +23,7 @@ use std::{ }; use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT}; use virtio_queue::QueueOwnedT; +use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; use vmm_sys_util::{ epoll::EventSet, @@ -381,27 +385,27 @@ impl VhostUserVsockThread { .next() { used_any = true; - let atomic_mem = atomic_mem.clone(); + let mem = atomic_mem.clone().memory(); let head_idx = avail_desc.head_index(); - let used_len = - match VsockPacket::from_rx_virtq_head(&mut avail_desc, atomic_mem.clone()) { - Ok(mut pkt) => { - if self.thread_backend.recv_pkt(&mut pkt).is_ok() { - pkt.hdr().len() + pkt.len() as usize - } else { - queue - .iter(atomic_mem.memory()) - .unwrap() - .go_to_previous_position(); - break; - } + let used_len = match VsockPacket::from_rx_virtq_chain( + mem.deref(), + &mut avail_desc, + CONN_TX_BUF_SIZE, + ) { + Ok(mut pkt) => { + if self.thread_backend.recv_pkt(&mut pkt).is_ok() { + PKT_HEADER_SIZE + pkt.len() as usize + } else { + queue.iter(mem).unwrap().go_to_previous_position(); + break; } - Err(e) => { - warn!("vsock: RX queue error: {:?}", e); - 0 - } - }; + } + Err(e) => { + warn!("vsock: RX queue error: {:?}", e); + 0 + } + }; let vring = vring.clone(); let event_idx = self.event_idx; @@ -484,10 +488,14 @@ impl VhostUserVsockThread { .next() { used_any = true; - let atomic_mem = atomic_mem.clone(); + let mem = atomic_mem.clone().memory(); let head_idx = avail_desc.head_index(); - let pkt = match VsockPacket::from_tx_virtq_head(&mut avail_desc, atomic_mem.clone()) { + let pkt = match VsockPacket::from_tx_virtq_chain( + mem.deref(), + &mut avail_desc, + CONN_TX_BUF_SIZE, + ) { Ok(pkt) => pkt, Err(e) => { dbg!("vsock: error reading TX packet: {:?}", e); @@ -499,7 +507,7 @@ impl VhostUserVsockThread { vring .get_mut() .get_queue_mut() - .iter(atomic_mem.memory()) + .iter(mem) .unwrap() .go_to_previous_position(); break; diff --git a/vsock/src/vsock_conn.rs b/vsock/src/vsock_conn.rs index f3b590b..96418ba 100644 --- a/vsock/src/vsock_conn.rs +++ b/vsock/src/vsock_conn.rs @@ -1,5 +1,4 @@ use super::{ - packet::*, rxops::*, rxqueue::*, txbuf::*, @@ -16,6 +15,8 @@ use std::{ num::Wrapping, os::unix::prelude::{AsRawFd, RawFd}, }; +use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; +use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice}; #[derive(Debug)] pub struct VsockConnection { @@ -117,7 +118,7 @@ impl VsockConnection { /// Process a vsock packet that is meant for this connection. /// Forward data to the host-side application if the vsock packet /// contains a RW operation. - pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> { + pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> { // Initialize all fields in the packet header self.init_pkt(pkt); @@ -141,7 +142,7 @@ impl VsockConnection { pkt.set_op(VSOCK_OP_CREDIT_REQUEST); return Ok(()); } - let buf = pkt.buf_mut().ok_or(Error::PktBufMissing)?; + let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?; // Perform a credit check to find the maximum read size. The read // data must fit inside a packet buffer and be within peer's @@ -149,7 +150,7 @@ impl VsockConnection { let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit()); // Read data from the stream directly into the buffer - if let Ok(read_cnt) = self.stream.read(&mut buf[..max_read_len]) { + if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) { if read_cnt == 0 { // If no data was read then the stream was closed down unexpectedly. // Send a shutdown packet to the guest-side application. @@ -198,7 +199,7 @@ impl VsockConnection { /// /// Returns: /// - always `Ok(())` to indicate that the packet has been consumed - pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> { + pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> { // Update peer credit information self.peer_buf_alloc = pkt.buf_alloc(); self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); @@ -213,19 +214,21 @@ impl VsockConnection { } VSOCK_OP_RW => { // Data has to be written to the host-side stream - if pkt.buf().is_none() { - info!( - "Dropping empty packet from guest (lp={}, pp={})", - self.local_port, self.peer_port - ); - return Ok(()); - } - - let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)]; - if let Err(err) = self.send_bytes(buf_slice) { - // TODO: Terminate this connection - dbg!("err:{:?}", err); - return Ok(()); + match pkt.data_slice() { + None => { + info!( + "Dropping empty packet from guest (lp={}, pp={})", + self.local_port, self.peer_port + ); + return Ok(()); + } + Some(buf) => { + if let Err(err) = self.send_bytes(buf) { + // TODO: Terminate this connection + dbg!("err:{:?}", err); + return Ok(()); + } + } } } VSOCK_OP_CREDIT_UPDATE => { @@ -271,7 +274,7 @@ impl VsockConnection { /// Returns: /// - Ok(cnt) where cnt is the number of bytes written to the stream /// - Err(Error::UnixWrite) if there was an error writing to the stream - fn send_bytes(&mut self, buf: &[u8]) -> Result<()> { + fn send_bytes(&mut self, buf: &VolatileSlice) -> Result<()> { if !self.tx_buf.is_empty() { // Data is already present in the buffer and the backend // is waiting for a EPOLLOUT event to flush it @@ -279,9 +282,9 @@ impl VsockConnection { } // Write data to the stream - let written_count = match self.stream.write(buf) { + let written_count = match buf.write_to(0, &mut self.stream, buf.len()) { Ok(cnt) => cnt, - Err(e) => { + Err(vm_memory::VolatileMemoryError::IOError(e)) => { if e.kind() == ErrorKind::WouldBlock { 0 } else { @@ -289,6 +292,10 @@ impl VsockConnection { return Err(Error::UnixWrite); } } + Err(e) => { + println!("send_bytes error: {:?}", e); + return Err(Error::UnixWrite); + } }; // Increment forwarded count by number of bytes written to the stream @@ -297,18 +304,19 @@ impl VsockConnection { self.rx_queue.enqueue(RxOps::CreditUpdate); if written_count != buf.len() { - return self.tx_buf.push(&buf[written_count..]); + return self.tx_buf.push(&buf.offset(written_count).unwrap()); } Ok(()) } /// Initialize all header fields in the vsock packet. - fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket { + fn init_pkt<'a, 'b, B: BitmapSlice>( + &self, + pkt: &'a mut VsockPacket<'b, B>, + ) -> &'a mut VsockPacket<'b, B> { // Zero out the packet header - for b in pkt.hdr_mut() { - *b = 0; - } + pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap(); pkt.set_src_cid(self.local_cid) .set_dst_cid(self.guest_cid) @@ -337,9 +345,103 @@ mod tests { use byteorder::{ByteOrder, LittleEndian}; use super::*; - use crate::packet::tests::{prepare_desc_chain_vsock, HeadParams}; - use crate::vhu_vsock::VSOCK_HOST_CID; + use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM}; use std::io::Result as IoResult; + use std::ops::Deref; + use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE}; + use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT}; + use vm_memory::{ + Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard, + GuestMemoryMmap, + }; + + struct HeadParams { + head_len: usize, + data_len: u32, + } + + impl HeadParams { + pub fn new(head_len: usize, data_len: u32) -> Self { + Self { head_len, data_len } + } + fn construct_head(&self) -> Vec { + let mut header = vec![0_u8; self.head_len]; + if self.head_len == PKT_HEADER_SIZE { + // Offset into the header for data length + const HDROFF_LEN: usize = 24; + LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len); + } + header + } + } + + fn prepare_desc_chain_vsock( + write_only: bool, + head_params: &HeadParams, + data_chain_len: u16, + head_data_len: u32, + ) -> ( + GuestMemoryAtomic, + DescriptorChain>, + ) { + let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); + let virt_queue = MockSplitQueue::new(&mem, 16); + let mut next_addr = virt_queue.desc_table().total_size() + 0x100; + let mut flags = 0; + + if write_only { + flags |= VRING_DESC_F_WRITE; + } + + let mut head_flags = if data_chain_len > 0 { + flags | VRING_DESC_F_NEXT + } else { + flags + }; + + // vsock packet header + // let header = vec![0 as u8; head_params.head_len]; + let header = head_params.construct_head(); + let head_desc = + Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1); + mem.write(&header, head_desc.addr()).unwrap(); + assert!(virt_queue.desc_table().store(0, head_desc).is_ok()); + next_addr += head_params.head_len as u64; + + // Put the descriptor index 0 in the first available ring position. + mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4)) + .unwrap(); + + // Set `avail_idx` to 1. + mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2)) + .unwrap(); + + // chain len excludes the head + for i in 0..(data_chain_len) { + // last descr in chain + if i == data_chain_len - 1 { + head_flags &= !VRING_DESC_F_NEXT; + } + // vsock data + let data = vec![0_u8; head_data_len as usize]; + let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2); + mem.write(&data, data_desc.addr()).unwrap(); + assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok()); + next_addr += head_data_len as u64; + } + + // Create descriptor chain from pre-filled memory + ( + GuestMemoryAtomic::new(mem.clone()), + virt_queue + .create_queue::() + .unwrap() + .iter(GuestMemoryAtomic::new(mem.clone()).memory()) + .unwrap() + .next() + .unwrap(), + ) + } struct VsockDummySocket { data: Vec, @@ -435,7 +537,7 @@ mod tests { #[test] fn test_vsock_conn_init_pkt() { // parameters for packet head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10); + let head_params = HeadParams::new(PKT_HEADER_SIZE, 10); // new locally inititated connection let dummy_file = VsockDummySocket::new(); @@ -444,7 +546,10 @@ mod tests { // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); - let mut vsock_pkt = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap(); + let mem = mem.memory(); + let mut vsock_pkt = + VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); // initialize a vsock packet for the guest vsock_conn_local.init_pkt(&mut vsock_pkt); @@ -453,7 +558,7 @@ mod tests { assert_eq!(vsock_pkt.dst_cid(), 3); assert_eq!(vsock_pkt.src_port(), 5000); assert_eq!(vsock_pkt.dst_port(), 5001); - assert_eq!(vsock_pkt.pkt_type(), VSOCK_TYPE_STREAM); + assert_eq!(vsock_pkt.type_(), VSOCK_TYPE_STREAM); assert_eq!(vsock_pkt.buf_alloc(), CONN_TX_BUF_SIZE); assert_eq!(vsock_pkt.fwd_cnt(), 0); } @@ -461,7 +566,7 @@ mod tests { #[test] fn test_vsock_conn_recv_pkt() { // parameters for packet head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 5); + let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); // new locally inititated connection let dummy_file = VsockDummySocket::new(); @@ -470,7 +575,10 @@ mod tests { // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5); - let mut vsock_pkt = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap(); + let mem = mem.memory(); + let mut vsock_pkt = + VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); // VSOCK_OP_REQUEST: new local conn request vsock_conn_local.rx_queue.enqueue(RxOps::Request); @@ -519,7 +627,9 @@ mod tests { assert_eq!(vsock_pkt.op(), VSOCK_OP_RW); assert!(!vsock_conn_local.rx_queue.pending_rx()); assert_eq!(vsock_pkt.len(), 5); - assert_eq!(vsock_pkt.buf().unwrap(), b"hello"); + let buf = &mut [0u8; 5]; + assert!(vsock_pkt.data_slice().unwrap().read_slice(buf, 0).is_ok()); + assert_eq!(buf, b"hello"); // VSOCK_OP_RESPONSE: response from a locally initiated connection vsock_conn_local.rx_queue.enqueue(RxOps::Response); @@ -545,7 +655,7 @@ mod tests { #[test] fn test_vsock_conn_send_pkt() { // parameters for packet head construction - let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 5); + let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); // new locally inititated connection let dummy_file = VsockDummySocket::new(); @@ -554,13 +664,13 @@ mod tests { // write only descriptor chain let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5); - let mut vsock_pkt = VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap(); + let mem = mem.memory(); + let mut vsock_pkt = + VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); // peer credit information - const HDROFF_BUF_ALLOC: usize = 36; - const HDROFF_FWD_CNT: usize = 40; - LittleEndian::write_u32(&mut vsock_pkt.hdr_mut()[HDROFF_BUF_ALLOC..], 65536); - LittleEndian::write_u32(&mut vsock_pkt.hdr_mut()[HDROFF_FWD_CNT..], 1024); + vsock_pkt.set_buf_alloc(65536).set_fwd_cnt(1024); // check if peer credit information is updated currently let credit_check = vsock_conn_local.send_pkt(&vsock_pkt); @@ -579,7 +689,8 @@ mod tests { // VSOCK_OP_RW vsock_pkt.set_op(VSOCK_OP_RW); - vsock_pkt.buf_mut().unwrap().copy_from_slice(b"hello"); + let buf = b"hello"; + assert!(vsock_pkt.data_slice().unwrap().write_slice(buf, 0).is_ok()); let rw_response = vsock_conn_local.send_pkt(&vsock_pkt); assert!(rw_response.is_ok()); let mut resp_buf = vec![0; 5];