vsock: use VsockPacket from the virtio-vsock crate

Replace our VsockPacket with the new one implemented in the
virtio-vsock crate.

Based on Alex Bennée and Laura Loghin commit:
https://github.com/stsquad/vhost-device/commit/6752266608dd

  To achieve this the following changes where made:
    - deleted the internal packet.rs
    - convert the send_pkt/recv_pkt helpers to be BitmapSlice aware
    - update push from LocalTxBuf
    - tweak a few API calls due to minor diffs

Fixed tests and moved some helpers from the removed
vsock/src/packet.rs file.

Co-developed-by: Laura Loghin <lauralg@amazon.com>
Co-developed-by: Alex Bennée <alex.bennee@linaro.org>
Signed-off-by: Stefano Garzarella <sgarzare@redhat.com>
This commit is contained in:
Stefano Garzarella 2022-09-26 20:20:11 +02:00
parent 896d346135
commit df1073bf2e
8 changed files with 224 additions and 673 deletions

11
Cargo.lock generated
View File

@ -683,6 +683,7 @@ dependencies = [
"vhost-user-backend",
"virtio-bindings",
"virtio-queue",
"virtio-vsock",
"vm-memory",
"vmm-sys-util",
]
@ -705,6 +706,16 @@ dependencies = [
"vmm-sys-util",
]
[[package]]
name = "virtio-vsock"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876299fc0f59e07aadc77541ce49dd75f7548f4d095eac6f7104b805394029e8"
dependencies = [
"virtio-queue",
"vm-memory",
]
[[package]]
name = "vm-memory"
version = "0.9.0"

View File

@ -20,6 +20,7 @@ vhost = { version = "0.5", features = ["vhost-user-slave"] }
vhost-user-backend = "0.7.0"
virtio-bindings = ">=0.1"
virtio-queue = "0.6"
virtio-vsock = "0.1.0"
vm-memory = ">=0.8"
vmm-sys-util = "=0.10.0"

View File

@ -1,4 +1,3 @@
mod packet;
mod rxops;
mod rxqueue;
mod thread_backend;

View File

@ -1,592 +0,0 @@
#![deny(missing_docs)]
use byteorder::{ByteOrder, LittleEndian};
use thiserror::Error as ThisError;
use virtio_queue::DescriptorChain;
use vm_memory::{
GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryLoadGuard,
GuestMemoryMmap,
};
pub(crate) type Result<T> = std::result::Result<T, Error>;
/// Below enum defines custom error types for vsock packet operations.
#[derive(Debug, PartialEq, ThisError)]
pub(crate) enum Error {
#[error("Descriptor not writable")]
UnwritableDescriptor,
#[error("Missing descriptor in queue")]
QueueMissingDescriptor,
#[error("Small header descriptor: {0}")]
HdrDescTooSmall(u32),
#[error("Chained guest memory error")]
GuestMemory,
#[error("Descriptor not readable")]
UnreadableDescriptor,
#[error("Extra descriptors in the descriptor chain")]
ExtraDescrInChain,
#[error("Data buffer size less than size in packet header")]
DataDescTooSmall,
}
// TODO: Replace below with bindgen generated struct
// vsock packet header size when packed
pub const VSOCK_PKT_HDR_SIZE: usize = 44;
// Offset into header for source cid
const HDROFF_SRC_CID: usize = 0;
// Offset into header for destination cid
const HDROFF_DST_CID: usize = 8;
// Offset into header for source port
const HDROFF_SRC_PORT: usize = 16;
// Offset into header for destination port
const HDROFF_DST_PORT: usize = 20;
// Offset into the header for data length
const HDROFF_LEN: usize = 24;
// Offset into header for packet type
const HDROFF_TYPE: usize = 28;
// Offset into header for operation kind
const HDROFF_OP: usize = 30;
// Offset into header for additional flags
// only for VSOCK_OP_SHUTDOWN
const HDROFF_FLAGS: usize = 32;
// Offset into header for tx buf alloc
const HDROFF_BUF_ALLOC: usize = 36;
// Offset into header for forward count
const HDROFF_FWD_CNT: usize = 40;
/// Vsock packet structure implemented as a wrapper around a virtq descriptor chain:
/// - chain head holds the packet header
/// - optional data descriptor, only present for data packets (VSOCK_OP_RW)
#[derive(Debug)]
pub struct VsockPacket {
hdr: *mut u8,
buf: Option<*mut u8>,
buf_size: usize,
}
impl VsockPacket {
/// Create a vsock packet wrapper around a chain in the rx virtqueue.
/// Perform bounds checking before creating the wrapper.
pub(crate) fn from_rx_virtq_head(
chain: &mut DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap<()>>>,
mem: GuestMemoryAtomic<GuestMemoryMmap>,
) -> Result<Self> {
// head is at 0, next is at 1, max of two descriptors
// head contains the packet header
// next contains the optional packet data
let mut descr_vec = Vec::with_capacity(2);
for descr in chain {
if !descr.is_write_only() {
return Err(Error::UnwritableDescriptor);
}
descr_vec.push(descr);
}
if descr_vec.len() < 2 {
// We expect a head and a data descriptor
return Err(Error::QueueMissingDescriptor);
}
let head_descr = descr_vec[0];
let data_descr = descr_vec[1];
if head_descr.len() < VSOCK_PKT_HDR_SIZE as u32 {
return Err(Error::HdrDescTooSmall(head_descr.len()));
}
Ok(Self {
hdr: VsockPacket::guest_to_host_address(
&mem.memory(),
head_descr.addr(),
VSOCK_PKT_HDR_SIZE,
)
.ok_or(Error::GuestMemory)? as *mut u8,
buf: Some(
VsockPacket::guest_to_host_address(
&mem.memory(),
data_descr.addr(),
data_descr.len() as usize,
)
.ok_or(Error::GuestMemory)? as *mut u8,
),
buf_size: data_descr.len() as usize,
})
}
/// Create a vsock packet wrapper around a chain in the tx virtqueue
/// Bounds checking before creating the wrapper.
pub(crate) fn from_tx_virtq_head(
chain: &mut DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap<()>>>,
mem: GuestMemoryAtomic<GuestMemoryMmap>,
) -> Result<Self> {
// head is at 0, next is at 1, max of two descriptors
// head contains the packet header
// next contains the optional packet data
let mut descr_vec = Vec::with_capacity(2);
// let mut num_descr = 0;
for descr in chain {
if descr.is_write_only() {
return Err(Error::UnreadableDescriptor);
}
descr_vec.push(descr);
}
if descr_vec.len() > 2 {
return Err(Error::ExtraDescrInChain);
}
let head_descr = descr_vec[0];
if head_descr.len() < VSOCK_PKT_HDR_SIZE as u32 {
return Err(Error::HdrDescTooSmall(head_descr.len()));
}
let mut pkt = Self {
hdr: VsockPacket::guest_to_host_address(
&mem.memory(),
head_descr.addr(),
VSOCK_PKT_HDR_SIZE,
)
.ok_or(Error::GuestMemory)? as *mut u8,
buf: None,
buf_size: 0,
};
// Zero length packet
if pkt.is_empty() {
return Ok(pkt);
}
// There exists packet data as well
let data_descr = descr_vec[1];
// Data buffer should be as large as described in the header
if data_descr.len() < pkt.len() {
return Err(Error::DataDescTooSmall);
}
pkt.buf_size = data_descr.len() as usize;
pkt.buf = Some(
VsockPacket::guest_to_host_address(
&mem.memory(),
data_descr.addr(),
data_descr.len() as usize,
)
.ok_or(Error::GuestMemory)? as *mut u8,
);
Ok(pkt)
}
/// Convert an absolute address in guest address space to a host
/// pointer and verify that the provided size defines a valid
/// range within a single memory region.
fn guest_to_host_address(
mem: &GuestMemoryLoadGuard<GuestMemoryMmap>,
addr: GuestAddress,
size: usize,
) -> Option<*mut u8> {
if mem.check_range(addr, size) {
Some(mem.get_host_address(addr).unwrap())
} else {
None
}
}
/// In place byte slice access to vsock packet header.
pub fn hdr(&self) -> &[u8] {
// Safe as bound checks performed in from_*_virtq_head
unsafe { std::slice::from_raw_parts(self.hdr as *const u8, VSOCK_PKT_HDR_SIZE) }
}
/// In place mutable slice access to vsock packet header.
pub fn hdr_mut(&mut self) -> &mut [u8] {
// Safe as bound checks performed in from_*_virtq_head
unsafe { std::slice::from_raw_parts_mut(self.hdr, VSOCK_PKT_HDR_SIZE) }
}
/// Size of vsock packet data, found by accessing len field
/// of virtio_vsock_hdr struct.
pub fn len(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_LEN..])
}
/// Set the source cid.
pub fn set_src_cid(&mut self, cid: u64) -> &mut Self {
LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_SRC_CID..], cid);
self
}
/// Set the destination cid.
pub fn set_dst_cid(&mut self, cid: u64) -> &mut Self {
LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_DST_CID..], cid);
self
}
/// Set source port.
pub fn set_src_port(&mut self, port: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_SRC_PORT..], port);
self
}
/// Set destination port.
pub fn set_dst_port(&mut self, port: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_DST_PORT..], port);
self
}
/// Set type of connection.
pub fn set_type(&mut self, type_: u16) -> &mut Self {
LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_TYPE..], type_);
self
}
/// Set size of tx buf.
pub fn set_buf_alloc(&mut self, buf_alloc: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_BUF_ALLOC..], buf_alloc);
self
}
/// Set amount of tx buf data written to stream.
pub fn set_fwd_cnt(&mut self, fwd_cnt: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FWD_CNT..], fwd_cnt);
self
}
/// Set packet operation ID.
pub fn set_op(&mut self, op: u16) -> &mut Self {
LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_OP..], op);
self
}
/// Check if the packet has no data.
fn is_empty(&self) -> bool {
self.len() == 0
}
/// Get destination port from packet.
pub fn dst_port(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_DST_PORT..])
}
/// Get source port from packet.
pub fn src_port(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_SRC_PORT..])
}
/// Get source cid from packet.
pub fn src_cid(&self) -> u64 {
LittleEndian::read_u64(&self.hdr()[HDROFF_SRC_CID..])
}
/// Get destination cid from packet.
pub fn dst_cid(&self) -> u64 {
LittleEndian::read_u64(&self.hdr()[HDROFF_DST_CID..])
}
/// Get packet type.
pub fn pkt_type(&self) -> u16 {
LittleEndian::read_u16(&self.hdr()[HDROFF_TYPE..])
}
/// Get operation requested in the packet.
pub fn op(&self) -> u16 {
LittleEndian::read_u16(&self.hdr()[HDROFF_OP..])
}
/// Byte slice mutable access to vsock packet data buffer.
pub fn buf_mut(&mut self) -> Option<&mut [u8]> {
// Safe as bound checks performed while creating packet
self.buf
.map(|ptr| unsafe { std::slice::from_raw_parts_mut(ptr, self.buf_size) })
}
/// Byte slice access to vsock packet data buffer.
pub fn buf(&self) -> Option<&[u8]> {
// Safe as bound checks performed while creating packet
self.buf
.map(|ptr| unsafe { std::slice::from_raw_parts(ptr as *const u8, self.buf_size) })
}
/// Set data buffer length.
pub fn set_len(&mut self, len: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_LEN..], len);
self
}
/// Read buf alloc.
pub fn buf_alloc(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_BUF_ALLOC..])
}
/// Get fwd cnt from packet header.
pub fn fwd_cnt(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_FWD_CNT..])
}
/// Read flags from the packet header.
pub fn flags(&self) -> u32 {
LittleEndian::read_u32(&self.hdr()[HDROFF_FLAGS..])
}
/// Set packet header flag to flags.
pub fn set_flags(&mut self, flags: u32) -> &mut Self {
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FLAGS..], flags);
self
}
/// Set OP specific flags.
pub fn set_flag(&mut self, flag: u32) -> &mut Self {
self.set_flags(self.flags() | flag);
self
}
}
#[cfg(test)]
pub mod tests {
use crate::vhu_vsock::{VSOCK_OP_RW, VSOCK_TYPE_STREAM};
use super::*;
use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE};
use virtio_queue::{mock::MockSplitQueue, Descriptor, Queue, QueueOwnedT};
use vm_memory::{Address, Bytes, GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
pub struct HeadParams {
head_len: usize,
data_len: u32,
}
impl HeadParams {
pub fn new(head_len: usize, data_len: u32) -> Self {
Self { head_len, data_len }
}
fn construct_head(&self) -> Vec<u8> {
let mut header = vec![0_u8; self.head_len];
if self.head_len == VSOCK_PKT_HDR_SIZE {
LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len);
}
header
}
}
pub fn prepare_desc_chain_vsock(
write_only: bool,
head_params: &HeadParams,
data_chain_len: u16,
head_data_len: u32,
) -> (
GuestMemoryAtomic<GuestMemoryMmap>,
DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>,
) {
let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap();
let virt_queue = MockSplitQueue::new(&mem, 16);
let mut next_addr = virt_queue.desc_table().total_size() + 0x100;
let mut flags = 0;
if write_only {
flags |= VRING_DESC_F_WRITE;
}
let mut head_flags = if data_chain_len > 0 {
flags | VRING_DESC_F_NEXT
} else {
flags
};
// vsock packet header
// let header = vec![0 as u8; head_params.head_len];
let header = head_params.construct_head();
let head_desc =
Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1);
mem.write(&header, head_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(0, head_desc).is_ok());
next_addr += head_params.head_len as u64;
// Put the descriptor index 0 in the first available ring position.
mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4))
.unwrap();
// Set `avail_idx` to 1.
mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2))
.unwrap();
// chain len excludes the head
for i in 0..(data_chain_len) {
// last descr in chain
if i == data_chain_len - 1 {
head_flags &= !VRING_DESC_F_NEXT;
}
// vsock data
let data = vec![0_u8; head_data_len as usize];
let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2);
mem.write(&data, data_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok());
next_addr += head_data_len as u64;
}
// Create descriptor chain from pre-filled memory
(
GuestMemoryAtomic::new(mem.clone()),
virt_queue
.create_queue::<Queue>()
.unwrap()
.iter(GuestMemoryAtomic::new(mem.clone()).memory())
.unwrap()
.next()
.unwrap(),
)
}
#[test]
fn test_guest_to_host_address() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 1000)]).unwrap(),
);
assert!(VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(0), 1000).is_some());
assert!(VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(0), 500).is_some());
assert!(
VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(500), 500).is_some()
);
assert!(
VsockPacket::guest_to_host_address(&mem.memory(), GuestAddress(501), 500).is_none()
);
}
#[test]
fn test_from_rx_virtq_head() {
// parameters for packet head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
assert!(VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).is_ok());
// read only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 10);
assert_eq!(
VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::UnwritableDescriptor
);
// less than two descriptors
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 0, 10);
assert_eq!(
VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::QueueMissingDescriptor
);
// incorrect header length
let head_params = HeadParams::new(22, 10);
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 10);
assert_eq!(
VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::HdrDescTooSmall(22)
);
}
#[test]
fn test_vsock_packet_header_ops() {
// parameters for head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10);
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
let mut vsock_packet = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap();
// Check packet data's length
assert!(!vsock_packet.is_empty());
assert_eq!(vsock_packet.len(), 10);
// Set and get the source CID in the packet header
vsock_packet.set_src_cid(1);
assert_eq!(vsock_packet.src_cid(), 1);
// Set and get the destination CID in the packet header
vsock_packet.set_dst_cid(1);
assert_eq!(vsock_packet.dst_cid(), 1);
// Set and get the source port in the packet header
vsock_packet.set_src_port(5000);
assert_eq!(vsock_packet.src_port(), 5000);
// Set and get the destination port in the packet header
vsock_packet.set_dst_port(5000);
assert_eq!(vsock_packet.dst_port(), 5000);
// Set and get packet type
vsock_packet.set_type(VSOCK_TYPE_STREAM);
assert_eq!(vsock_packet.pkt_type(), VSOCK_TYPE_STREAM);
// Set and get tx buffer size
vsock_packet.set_buf_alloc(10);
assert_eq!(vsock_packet.buf_alloc(), 10);
// Set and get fwd_cnt of packet's data
vsock_packet.set_fwd_cnt(100);
assert_eq!(vsock_packet.fwd_cnt(), 100);
// Set and get packet operation type
vsock_packet.set_op(VSOCK_OP_RW);
assert_eq!(vsock_packet.op(), VSOCK_OP_RW);
// Set and get length of packet's data buffer
// this is a dummy test
vsock_packet.set_len(20);
assert_eq!(vsock_packet.len(), 20);
assert!(!vsock_packet.is_empty());
// Set and get packet's flags
vsock_packet.set_flags(1);
assert_eq!(vsock_packet.flags(), 1);
}
#[test]
fn test_from_tx_virtq_head() {
// parameters for head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 0);
// read only descriptor chain no data
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 0, 0);
assert!(VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).is_ok());
// parameters for head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10);
// read only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 10);
assert!(VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).is_ok());
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 10);
assert_eq!(
VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::UnreadableDescriptor
);
// more than 2 descriptors in chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 2, 10);
assert_eq!(
VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::ExtraDescrInChain
);
// length of data descriptor does not match the value in head
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);
assert_eq!(
VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap_err(),
Error::DataDescTooSmall
);
}
}

View File

@ -1,7 +1,6 @@
#![deny(missing_docs)]
use super::{
packet::*,
rxops::*,
vhu_vsock::{
ConnMapKey, Error, Result, VSOCK_HOST_CID, VSOCK_OP_REQUEST, VSOCK_OP_RST,
@ -18,6 +17,8 @@ use std::{
prelude::{AsRawFd, FromRawFd, RawFd},
},
};
use virtio_vsock::packet::VsockPacket;
use vm_memory::bitmap::BitmapSlice;
// TODO: convert UnixStream to Arc<Mutex<UnixStream>>
pub struct VsockThreadBackend {
@ -63,7 +64,7 @@ impl VsockThreadBackend {
/// Returns:
/// - `Ok(())` if the packet was successfully filled in
/// - `Err(Error::EmptyBackendRxQ) if there was no available data
pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> {
pub(crate) fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
// Pop an event from the backend_rxq
let key = self.backend_rxq.pop_front().ok_or(Error::EmptyBackendRxQ)?;
let conn = match self.conn_map.get_mut(&key) {
@ -117,11 +118,11 @@ impl VsockThreadBackend {
///
/// Returns:
/// - always `Ok(())` if packet has been consumed correctly
pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> {
pub(crate) fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
let key = ConnMapKey::new(pkt.dst_port(), pkt.src_port());
// TODO: Rst if packet has unsupported type
if pkt.pkt_type() != VSOCK_TYPE_STREAM {
if pkt.type_() != VSOCK_TYPE_STREAM {
info!("vsock: dropping packet of unknown type");
return Ok(());
}
@ -130,7 +131,7 @@ impl VsockThreadBackend {
if pkt.dst_cid() != VSOCK_HOST_CID {
info!(
"vsock: dropping packet for cid other than host: {:?}",
pkt.hdr()
pkt.dst_cid()
);
return Ok(());
@ -186,7 +187,7 @@ impl VsockThreadBackend {
/// Attempts to connect to a host side unix socket listening on a path
/// corresponding to the destination port as follows:
/// - "{self.host_sock_path}_{local_port}""
fn handle_new_guest_conn(&mut self, pkt: &VsockPacket) {
fn handle_new_guest_conn<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) {
let port_path = format!("{}_{}", self.host_socket_path, pkt.dst_port());
UnixStream::connect(port_path)
@ -197,7 +198,11 @@ impl VsockThreadBackend {
}
/// Wrapper to add new connection to relevant HashMaps.
fn add_new_guest_conn(&mut self, stream: UnixStream, pkt: &VsockPacket) -> Result<()> {
fn add_new_guest_conn<B: BitmapSlice>(
&mut self,
stream: UnixStream,
pkt: &VsockPacket<B>,
) -> Result<()> {
let stream_fd = stream.as_raw_fd();
self.listener_map
.insert(stream_fd, ConnMapKey::new(pkt.dst_port(), pkt.src_port()));

View File

@ -1,5 +1,6 @@
use super::vhu_vsock::{Error, Result, CONN_TX_BUF_SIZE};
use std::{io::Write, num::Wrapping};
use vm_memory::{bitmap::BitmapSlice, VolatileSlice};
#[derive(Debug)]
pub struct LocalTxBuf {
@ -28,7 +29,7 @@ impl LocalTxBuf {
/// Add new data to the tx buffer, push all or none.
/// Returns LocalTxBufFull error if space not sufficient.
pub(crate) fn push(&mut self, data_buf: &[u8]) -> Result<()> {
pub(crate) fn push<B: BitmapSlice>(&mut self, data_buf: &VolatileSlice<B>) -> Result<()> {
if CONN_TX_BUF_SIZE as usize - self.len() < data_buf.len() {
// Tx buffer is full
return Err(Error::LocalTxBufFull);
@ -39,11 +40,13 @@ impl LocalTxBuf {
// Check if we can fit the data buffer between head and end of buffer
let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - tail_idx, data_buf.len());
self.buf[tail_idx..tail_idx + len].copy_from_slice(&data_buf[..len]);
let txbuf = &mut self.buf[tail_idx..tail_idx + len];
data_buf.copy_to(txbuf);
// Check if there is more data to be wrapped around
if len < data_buf.len() {
self.buf[..(data_buf.len() - len)].copy_from_slice(&data_buf[len..]);
let remain_txbuf = &mut self.buf[..(data_buf.len() - len)];
data_buf.copy_to(remain_txbuf);
}
// Increment tail by the amount of data that has been added to the buffer
@ -124,7 +127,8 @@ mod tests {
#[test]
fn test_txbuf_push() {
let mut loc_tx_buf = LocalTxBuf::new();
let data = [0; CONN_TX_BUF_SIZE as usize];
let mut buf = [0; CONN_TX_BUF_SIZE as usize];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
// push data into empty tx buffer
let res_push = loc_tx_buf.push(&data);
@ -143,7 +147,8 @@ mod tests {
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2));
// only tail wraps at full
let data = vec![1; 4];
let mut buf = vec![1; 4];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
let mut cmp_data = vec![1; 4];
cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]);
loc_tx_buf.head = Wrapping(4);
@ -160,7 +165,8 @@ mod tests {
let mut loc_tx_buf = LocalTxBuf::new();
// data to be flushed
let data = vec![1; CONN_TX_BUF_SIZE as usize];
let mut buf = vec![1; CONN_TX_BUF_SIZE as usize];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
// target to which data is flushed
let mut cmp_vec = Vec::with_capacity(data.len());
@ -178,12 +184,14 @@ mod tests {
assert_eq!(loc_tx_buf.head, Wrapping(n as u32));
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE));
assert_eq!(n, cmp_vec.len());
assert_eq!(cmp_vec, data[..n]);
assert_eq!(cmp_vec, buf[..n]);
}
// wrapping head flush
let mut data = vec![0; (CONN_TX_BUF_SIZE / 2) as usize];
data.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]);
let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize];
buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]);
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
loc_tx_buf.head = Wrapping(0);
loc_tx_buf.tail = Wrapping(0);
let res_push = loc_tx_buf.push(&data);

View File

@ -1,8 +1,10 @@
use super::{
packet::*,
rxops::*,
thread_backend::*,
vhu_vsock::{ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, VSOCK_HOST_CID},
vhu_vsock::{
ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, CONN_TX_BUF_SIZE,
VSOCK_HOST_CID,
},
vsock_conn::*,
};
use futures::executor::{ThreadPool, ThreadPoolBuilder};
@ -12,6 +14,7 @@ use std::{
io,
io::Read,
num::Wrapping,
ops::Deref,
os::unix::{
net::{UnixListener, UnixStream},
prelude::{AsRawFd, FromRawFd, RawFd},
@ -20,6 +23,7 @@ use std::{
};
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
use virtio_queue::QueueOwnedT;
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::{
epoll::EventSet,
@ -381,27 +385,27 @@ impl VhostUserVsockThread {
.next()
{
used_any = true;
let atomic_mem = atomic_mem.clone();
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let used_len =
match VsockPacket::from_rx_virtq_head(&mut avail_desc, atomic_mem.clone()) {
Ok(mut pkt) => {
if self.thread_backend.recv_pkt(&mut pkt).is_ok() {
pkt.hdr().len() + pkt.len() as usize
} else {
queue
.iter(atomic_mem.memory())
.unwrap()
.go_to_previous_position();
break;
}
let used_len = match VsockPacket::from_rx_virtq_chain(
mem.deref(),
&mut avail_desc,
CONN_TX_BUF_SIZE,
) {
Ok(mut pkt) => {
if self.thread_backend.recv_pkt(&mut pkt).is_ok() {
PKT_HEADER_SIZE + pkt.len() as usize
} else {
queue.iter(mem).unwrap().go_to_previous_position();
break;
}
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};
}
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};
let vring = vring.clone();
let event_idx = self.event_idx;
@ -484,10 +488,14 @@ impl VhostUserVsockThread {
.next()
{
used_any = true;
let atomic_mem = atomic_mem.clone();
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let pkt = match VsockPacket::from_tx_virtq_head(&mut avail_desc, atomic_mem.clone()) {
let pkt = match VsockPacket::from_tx_virtq_chain(
mem.deref(),
&mut avail_desc,
CONN_TX_BUF_SIZE,
) {
Ok(pkt) => pkt,
Err(e) => {
dbg!("vsock: error reading TX packet: {:?}", e);
@ -499,7 +507,7 @@ impl VhostUserVsockThread {
vring
.get_mut()
.get_queue_mut()
.iter(atomic_mem.memory())
.iter(mem)
.unwrap()
.go_to_previous_position();
break;

View File

@ -1,5 +1,4 @@
use super::{
packet::*,
rxops::*,
rxqueue::*,
txbuf::*,
@ -16,6 +15,8 @@ use std::{
num::Wrapping,
os::unix::prelude::{AsRawFd, RawFd},
};
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice};
#[derive(Debug)]
pub struct VsockConnection<S> {
@ -117,7 +118,7 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
/// Process a vsock packet that is meant for this connection.
/// Forward data to the host-side application if the vsock packet
/// contains a RW operation.
pub(crate) fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> {
pub(crate) fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
// Initialize all fields in the packet header
self.init_pkt(pkt);
@ -141,7 +142,7 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
return Ok(());
}
let buf = pkt.buf_mut().ok_or(Error::PktBufMissing)?;
let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?;
// Perform a credit check to find the maximum read size. The read
// data must fit inside a packet buffer and be within peer's
@ -149,7 +150,7 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit());
// Read data from the stream directly into the buffer
if let Ok(read_cnt) = self.stream.read(&mut buf[..max_read_len]) {
if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) {
if read_cnt == 0 {
// If no data was read then the stream was closed down unexpectedly.
// Send a shutdown packet to the guest-side application.
@ -198,7 +199,7 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
///
/// Returns:
/// - always `Ok(())` to indicate that the packet has been consumed
pub(crate) fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()> {
pub(crate) fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
// Update peer credit information
self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
@ -213,19 +214,21 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
}
VSOCK_OP_RW => {
// Data has to be written to the host-side stream
if pkt.buf().is_none() {
info!(
"Dropping empty packet from guest (lp={}, pp={})",
self.local_port, self.peer_port
);
return Ok(());
}
let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)];
if let Err(err) = self.send_bytes(buf_slice) {
// TODO: Terminate this connection
dbg!("err:{:?}", err);
return Ok(());
match pkt.data_slice() {
None => {
info!(
"Dropping empty packet from guest (lp={}, pp={})",
self.local_port, self.peer_port
);
return Ok(());
}
Some(buf) => {
if let Err(err) = self.send_bytes(buf) {
// TODO: Terminate this connection
dbg!("err:{:?}", err);
return Ok(());
}
}
}
}
VSOCK_OP_CREDIT_UPDATE => {
@ -271,7 +274,7 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
/// Returns:
/// - Ok(cnt) where cnt is the number of bytes written to the stream
/// - Err(Error::UnixWrite) if there was an error writing to the stream
fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> {
if !self.tx_buf.is_empty() {
// Data is already present in the buffer and the backend
// is waiting for a EPOLLOUT event to flush it
@ -279,9 +282,9 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
}
// Write data to the stream
let written_count = match self.stream.write(buf) {
let written_count = match buf.write_to(0, &mut self.stream, buf.len()) {
Ok(cnt) => cnt,
Err(e) => {
Err(vm_memory::VolatileMemoryError::IOError(e)) => {
if e.kind() == ErrorKind::WouldBlock {
0
} else {
@ -289,6 +292,10 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
return Err(Error::UnixWrite);
}
}
Err(e) => {
println!("send_bytes error: {:?}", e);
return Err(Error::UnixWrite);
}
};
// Increment forwarded count by number of bytes written to the stream
@ -297,18 +304,19 @@ impl<S: AsRawFd + Read + Write> VsockConnection<S> {
self.rx_queue.enqueue(RxOps::CreditUpdate);
if written_count != buf.len() {
return self.tx_buf.push(&buf[written_count..]);
return self.tx_buf.push(&buf.offset(written_count).unwrap());
}
Ok(())
}
/// Initialize all header fields in the vsock packet.
fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket {
fn init_pkt<'a, 'b, B: BitmapSlice>(
&self,
pkt: &'a mut VsockPacket<'b, B>,
) -> &'a mut VsockPacket<'b, B> {
// Zero out the packet header
for b in pkt.hdr_mut() {
*b = 0;
}
pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap();
pkt.set_src_cid(self.local_cid)
.set_dst_cid(self.guest_cid)
@ -337,9 +345,103 @@ mod tests {
use byteorder::{ByteOrder, LittleEndian};
use super::*;
use crate::packet::tests::{prepare_desc_chain_vsock, HeadParams};
use crate::vhu_vsock::VSOCK_HOST_CID;
use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM};
use std::io::Result as IoResult;
use std::ops::Deref;
use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE};
use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT};
use vm_memory::{
Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard,
GuestMemoryMmap,
};
struct HeadParams {
head_len: usize,
data_len: u32,
}
impl HeadParams {
pub fn new(head_len: usize, data_len: u32) -> Self {
Self { head_len, data_len }
}
fn construct_head(&self) -> Vec<u8> {
let mut header = vec![0_u8; self.head_len];
if self.head_len == PKT_HEADER_SIZE {
// Offset into the header for data length
const HDROFF_LEN: usize = 24;
LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len);
}
header
}
}
fn prepare_desc_chain_vsock(
write_only: bool,
head_params: &HeadParams,
data_chain_len: u16,
head_data_len: u32,
) -> (
GuestMemoryAtomic<GuestMemoryMmap>,
DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>,
) {
let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap();
let virt_queue = MockSplitQueue::new(&mem, 16);
let mut next_addr = virt_queue.desc_table().total_size() + 0x100;
let mut flags = 0;
if write_only {
flags |= VRING_DESC_F_WRITE;
}
let mut head_flags = if data_chain_len > 0 {
flags | VRING_DESC_F_NEXT
} else {
flags
};
// vsock packet header
// let header = vec![0 as u8; head_params.head_len];
let header = head_params.construct_head();
let head_desc =
Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1);
mem.write(&header, head_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(0, head_desc).is_ok());
next_addr += head_params.head_len as u64;
// Put the descriptor index 0 in the first available ring position.
mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4))
.unwrap();
// Set `avail_idx` to 1.
mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2))
.unwrap();
// chain len excludes the head
for i in 0..(data_chain_len) {
// last descr in chain
if i == data_chain_len - 1 {
head_flags &= !VRING_DESC_F_NEXT;
}
// vsock data
let data = vec![0_u8; head_data_len as usize];
let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2);
mem.write(&data, data_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok());
next_addr += head_data_len as u64;
}
// Create descriptor chain from pre-filled memory
(
GuestMemoryAtomic::new(mem.clone()),
virt_queue
.create_queue::<Queue>()
.unwrap()
.iter(GuestMemoryAtomic::new(mem.clone()).memory())
.unwrap()
.next()
.unwrap(),
)
}
struct VsockDummySocket {
data: Vec<u8>,
@ -435,7 +537,7 @@ mod tests {
#[test]
fn test_vsock_conn_init_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 10);
let head_params = HeadParams::new(PKT_HEADER_SIZE, 10);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
@ -444,7 +546,10 @@ mod tests {
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
let mut vsock_pkt = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap();
let mem = mem.memory();
let mut vsock_pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// initialize a vsock packet for the guest
vsock_conn_local.init_pkt(&mut vsock_pkt);
@ -453,7 +558,7 @@ mod tests {
assert_eq!(vsock_pkt.dst_cid(), 3);
assert_eq!(vsock_pkt.src_port(), 5000);
assert_eq!(vsock_pkt.dst_port(), 5001);
assert_eq!(vsock_pkt.pkt_type(), VSOCK_TYPE_STREAM);
assert_eq!(vsock_pkt.type_(), VSOCK_TYPE_STREAM);
assert_eq!(vsock_pkt.buf_alloc(), CONN_TX_BUF_SIZE);
assert_eq!(vsock_pkt.fwd_cnt(), 0);
}
@ -461,7 +566,7 @@ mod tests {
#[test]
fn test_vsock_conn_recv_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 5);
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
@ -470,7 +575,10 @@ mod tests {
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5);
let mut vsock_pkt = VsockPacket::from_rx_virtq_head(&mut descr_chain, mem).unwrap();
let mem = mem.memory();
let mut vsock_pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// VSOCK_OP_REQUEST: new local conn request
vsock_conn_local.rx_queue.enqueue(RxOps::Request);
@ -519,7 +627,9 @@ mod tests {
assert_eq!(vsock_pkt.op(), VSOCK_OP_RW);
assert!(!vsock_conn_local.rx_queue.pending_rx());
assert_eq!(vsock_pkt.len(), 5);
assert_eq!(vsock_pkt.buf().unwrap(), b"hello");
let buf = &mut [0u8; 5];
assert!(vsock_pkt.data_slice().unwrap().read_slice(buf, 0).is_ok());
assert_eq!(buf, b"hello");
// VSOCK_OP_RESPONSE: response from a locally initiated connection
vsock_conn_local.rx_queue.enqueue(RxOps::Response);
@ -545,7 +655,7 @@ mod tests {
#[test]
fn test_vsock_conn_send_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(VSOCK_PKT_HDR_SIZE, 5);
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
@ -554,13 +664,13 @@ mod tests {
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);
let mut vsock_pkt = VsockPacket::from_tx_virtq_head(&mut descr_chain, mem).unwrap();
let mem = mem.memory();
let mut vsock_pkt =
VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// peer credit information
const HDROFF_BUF_ALLOC: usize = 36;
const HDROFF_FWD_CNT: usize = 40;
LittleEndian::write_u32(&mut vsock_pkt.hdr_mut()[HDROFF_BUF_ALLOC..], 65536);
LittleEndian::write_u32(&mut vsock_pkt.hdr_mut()[HDROFF_FWD_CNT..], 1024);
vsock_pkt.set_buf_alloc(65536).set_fwd_cnt(1024);
// check if peer credit information is updated currently
let credit_check = vsock_conn_local.send_pkt(&vsock_pkt);
@ -579,7 +689,8 @@ mod tests {
// VSOCK_OP_RW
vsock_pkt.set_op(VSOCK_OP_RW);
vsock_pkt.buf_mut().unwrap().copy_from_slice(b"hello");
let buf = b"hello";
assert!(vsock_pkt.data_slice().unwrap().write_slice(buf, 0).is_ok());
let rw_response = vsock_conn_local.send_pkt(&vsock_pkt);
assert!(rw_response.is_ok());
let mut resp_buf = vec![0; 5];