Merge branch 'main' into cleanups

This commit is contained in:
Viresh Kumar 2022-10-12 20:56:53 +05:30 committed by GitHub
commit da3ad14d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 3085 additions and 6 deletions

234
Cargo.lock generated
View File

@ -28,12 +28,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cc"
version = "1.0.73"
@ -46,6 +58,23 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "3.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
dependencies = [
"atty",
"bitflags",
"clap_derive 3.2.18",
"clap_lex 0.2.4",
"indexmap",
"once_cell",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap"
version = "4.0.11"
@ -54,13 +83,26 @@ checksum = "4ed45cc2c62a3eff523e718d8576ba762c83a3146151093283ac62ae11933a73"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"clap_derive 4.0.10",
"clap_lex 0.3.0",
"once_cell",
"strsim",
"termcolor",
]
[[package]]
name = "clap_derive"
version = "3.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_derive"
version = "4.0.10"
@ -74,6 +116,15 @@ dependencies = [
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "clap_lex"
version = "0.3.0"
@ -115,6 +166,96 @@ dependencies = [
"instant",
]
[[package]]
name = "futures"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-executor"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
"num_cpus",
]
[[package]]
name = "futures-io"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
[[package]]
name = "futures-macro"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56"
[[package]]
name = "futures-task"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
[[package]]
name = "futures-util"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.7"
@ -126,6 +267,12 @@ dependencies = [
"wasi",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "heck"
version = "0.4.0"
@ -147,6 +294,16 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "indexmap"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -196,6 +353,16 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.15.0"
@ -208,6 +375,18 @@ version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.16"
@ -321,6 +500,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]]
name = "strsim"
version = "0.10.0"
@ -361,6 +549,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]]
name = "thiserror"
version = "1.0.37"
@ -409,7 +603,7 @@ dependencies = [
name = "vhost-device-gpio"
version = "0.1.0"
dependencies = [
"clap",
"clap 4.0.11",
"env_logger",
"libc",
"libgpiod",
@ -427,7 +621,7 @@ dependencies = [
name = "vhost-device-i2c"
version = "0.1.0"
dependencies = [
"clap",
"clap 4.0.11",
"env_logger",
"libc",
"log",
@ -444,7 +638,7 @@ dependencies = [
name = "vhost-device-rng"
version = "0.1.0"
dependencies = [
"clap",
"clap 4.0.11",
"env_logger",
"epoll",
"libc",
@ -475,6 +669,26 @@ dependencies = [
"vmm-sys-util",
]
[[package]]
name = "vhost-user-vsock"
version = "0.1.0"
dependencies = [
"byteorder",
"clap 3.2.22",
"env_logger",
"epoll",
"futures",
"log",
"thiserror",
"vhost",
"vhost-user-backend",
"virtio-bindings",
"virtio-queue",
"virtio-vsock",
"vm-memory",
"vmm-sys-util",
]
[[package]]
name = "virtio-bindings"
version = "0.1.0"
@ -493,6 +707,16 @@ dependencies = [
"vmm-sys-util",
]
[[package]]
name = "virtio-vsock"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876299fc0f59e07aadc77541ce49dd75f7548f4d095eac6f7104b805394029e8"
dependencies = [
"virtio-queue",
"vm-memory",
]
[[package]]
name = "vm-memory"
version = "0.9.0"

View File

@ -4,4 +4,5 @@ members = [
"gpio",
"i2c",
"rng",
"vsock",
]

View File

@ -11,6 +11,7 @@ Here is the list of device backends that we support:
- [GPIO](https://github.com/rust-vmm/vhost-device/blob/main/gpio/README.md)
- [I2C](https://github.com/rust-vmm/vhost-device/blob/main/i2c/README.md)
- [RNG](https://github.com/rust-vmm/vhost-device/blob/main/rng/README.md)
- [VSOCK](https://github.com/rust-vmm/vhost-device/blob/main/vsock/README.md)
## Testing and Code Coverage

View File

@ -1,5 +1,5 @@
{
"coverage_score": 77.0,
"coverage_score": 69.6,
"exclude_path": "",
"crate_features": ""
}

29
vsock/Cargo.toml Normal file
View File

@ -0,0 +1,29 @@
[package]
name = "vhost-user-vsock"
version = "0.1.0"
authors = ["Harshavardhan Unnibhavi <harshanavkis@gmail.com>"]
description = "A virtio-vsock device using the vhost-user protocol."
repository = "https://github.com/rust-vmm/vhost-device"
readme = "README.md"
keywords = ["vhost", "vsock"]
license = "Apache-2.0 OR BSD-3-Clause"
edition = "2018"
[dependencies]
byteorder = "1"
clap = { version = ">=3.0", features = ["derive"] }
env_logger = ">=0.9"
epoll = "4.3.1"
futures = { version = "0.3", features = ["thread-pool"] }
log = ">=0.4.6"
thiserror = "1.0"
vhost = { version = "0.5", features = ["vhost-user-slave"] }
vhost-user-backend = "0.7.0"
virtio-bindings = ">=0.1"
virtio-queue = "0.6"
virtio-vsock = "0.1.0"
vm-memory = ">=0.8"
vmm-sys-util = "=0.10.0"
[dev-dependencies]
virtio-queue = { version = "0.6", features = ["test-utils"] }

108
vsock/README.md Normal file
View File

@ -0,0 +1,108 @@
# vhost-user-vsock
## Design
The crate introduces a vhost-user-vsock device that enables communication between an
application running in the guest i.e inside a VM and an application running on the
host i.e outside the VM. The application running in the guest communicates over VM
sockets i.e over AF_VSOCK sockets. The application running on the host connects to a
unix socket on the host i.e communicates over AF_UNIX sockets. The main components of
the crate are split into various files as described below:
- [packet.rs](src/packet.rs)
- Introduces the **VsockPacket** structure that represents a single vsock packet
processing methods.
- [rxops.rs](src/rxops.rs)
- Introduces various vsock operations that are enqueued into the rxqueue to be sent to the
guest. Exposes a **RxOps** structure.
- [rxqueue.rs](src/rxqueue.rs)
- rxqueue contains the pending rx operations corresponding to that connection. The queue is
represented as a bitmap as we handle connection-oriented connections. The module contains
various queue manipulation methods. Exposes a **RxQueue** structure.
- [thread_backend.rs](src/thread_backend.rs)
- Multiplexes connections between host and guest and calls into per connection methods that
are responsible for processing data and packets corresponding to the connection. Exposes a
**VsockThreadBackend** structure.
- [txbuf.rs](src/txbuf.rs)
- Module to buffer data that is sent from the guest to the host. The module exposes a **LocalTxBuf**
structure.
- [vhost_user_vsock_thread.rs](src/vhost_user_vsock_thread.rs)
- Module exposes a **VhostUserVsockThread** structure. It also handles new host initiated
connections and provides interfaces for registering host connections with the epoll fd. Also
provides interfaces for iterating through the rx and tx queues.
- [vsock_conn.rs](src/vsock_conn.rs)
- Module introduces a **VsockConnection** structure that represents a single vsock connection
between the guest and the host. It also processes packets according to their type.
- [vhu_vsock.rs](src/lib.rs)
- exposes the main vhost user vsock backend interface.
## Usage
Run the vhost-user-vsock device:
```
vhost-user-vsock --guest-cid=4 --uds-path=/tmp/vm4.vsock --socket=/tmp/vhost4.socket
```
Run qemu:
```
qemu-system-x86_64 -drive file=/path/to/disk.qcow2 -enable-kvm -m 512M \
-smp 2 -vga virtio -chardev socket,id=char0,reconnect=0,path=/tmp/vhost4.socket \
-device vhost-user-vsock-pci,chardev=char0 \
-object memory-backend-file,share=on,id=mem,size="512M",mem-path="/dev/hugepages" \
-numa node,memdev=mem -mem-prealloc
```
### Guest listening
#### iperf
```sh
# https://github.com/stefano-garzarella/iperf-vsock
guest$ iperf3 --vsock -s
host$ iperf3 --vsock -c /tmp/vm4.vsock
```
#### netcat
```sh
guest$ nc --vsock -l 1234
host$ nc -U /tmp/vm4.vsock
CONNECT 1234
```
### Host listening
#### iperf
```sh
# https://github.com/stefano-garzarella/iperf-vsock
host$ iperf3 --vsock -s -B /tmp/vm4.vsock
guest$ iperf3 --vsock -c 2
```
#### netcat
```sh
host$ nc -l -U /tmp/vm4.vsock_1234
guest$ nc --vsock 2 1234
```
```rust
use my_crate;
...
```
## License
**!!!NOTICE**: The BSD-3-Clause license is not included in this template.
The license needs to be manually added because the text of the license file
also includes the copyright. The copyright can be different for different
crates. If the crate contains code from CrosVM, the crate must add the
CrosVM copyright which can be found
[here](https://chromium.googlesource.com/chromiumos/platform/crosvm/+/master/LICENSE).
For crates developed from scratch, the copyright is different and depends on
the contributors.

158
vsock/src/main.rs Normal file
View File

@ -0,0 +1,158 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
mod rxops;
mod rxqueue;
mod thread_backend;
mod txbuf;
mod vhu_vsock;
mod vhu_vsock_thread;
mod vsock_conn;
use std::{
convert::TryFrom,
sync::{Arc, RwLock},
};
use clap::Parser;
use log::{info, warn};
use vhost::{vhost_user, vhost_user::Listener};
use vhost_user_backend::VhostUserDaemon;
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use crate::vhu_vsock::{Error, Result, VhostUserVsockBackend, VsockConfig};
#[derive(Parser, Debug)]
#[clap(version, about, long_about = None)]
struct VsockArgs {
/// Context identifier of the guest which uniquely identifies the device for its lifetime.
#[clap(long, default_value_t = 3)]
guest_cid: u64,
/// Unix socket to which a hypervisor conencts to and sets up the control path with the device.
#[clap(long)]
socket: String,
/// Unix socket to which a host-side application connects to.
#[clap(long)]
uds_path: String,
}
impl TryFrom<VsockArgs> for VsockConfig {
type Error = Error;
fn try_from(cmd_args: VsockArgs) -> Result<Self> {
let socket = cmd_args.socket.trim().to_string();
let uds_path = cmd_args.uds_path.trim().to_string();
Ok(VsockConfig::new(cmd_args.guest_cid, socket, uds_path))
}
}
/// This is the public API through which an external program starts the
/// vhost-user-vsock backend server.
pub(crate) fn start_backend_server(config: VsockConfig) {
loop {
let backend = Arc::new(RwLock::new(
VhostUserVsockBackend::new(config.clone()).unwrap(),
));
let listener = Listener::new(config.get_socket_path(), true).unwrap();
let mut daemon = VhostUserDaemon::new(
String::from("vhost-user-vsock"),
backend.clone(),
GuestMemoryAtomic::new(GuestMemoryMmap::new()),
)
.unwrap();
let mut vring_workers = daemon.get_epoll_handlers();
for thread in backend.read().unwrap().threads.iter() {
thread
.lock()
.unwrap()
.set_vring_worker(Some(vring_workers.remove(0)));
}
daemon.start(listener).unwrap();
match daemon.wait() {
Ok(()) => {
info!("Stopping cleanly");
}
Err(vhost_user_backend::Error::HandleRequest(vhost_user::Error::PartialMessage)) => {
info!("vhost-user connection closed with partial message. If the VM is shutting down, this is expected behavior; otherwise, it might be a bug.");
}
Err(e) => {
warn!("Error running daemon: {:?}", e);
}
}
// No matter the result, we need to shut down the worker thread.
backend.read().unwrap().exit_event.write(1).unwrap();
}
}
fn main() {
env_logger::init();
let config = VsockConfig::try_from(VsockArgs::parse()).unwrap();
start_backend_server(config);
}
#[cfg(test)]
mod tests {
use super::*;
impl VsockArgs {
fn from_args(guest_cid: u64, socket: &str, uds_path: &str) -> Self {
VsockArgs {
guest_cid,
socket: socket.to_string(),
uds_path: uds_path.to_string(),
}
}
}
#[test]
fn test_vsock_config_setup() {
let args = VsockArgs::from_args(3, "/tmp/vhost4.socket", "/tmp/vm4.vsock");
let config = VsockConfig::try_from(args);
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.get_guest_cid(), 3);
assert_eq!(config.get_socket_path(), "/tmp/vhost4.socket");
assert_eq!(config.get_uds_path(), "/tmp/vm4.vsock");
}
#[test]
fn test_vsock_server() {
const CID: u64 = 3;
const VHOST_SOCKET_PATH: &str = "test_vsock_server.socket";
const VSOCK_SOCKET_PATH: &str = "test_vsock_server.vsock";
let config = VsockConfig::new(
CID,
VHOST_SOCKET_PATH.to_string(),
VSOCK_SOCKET_PATH.to_string(),
);
let backend = Arc::new(RwLock::new(VhostUserVsockBackend::new(config).unwrap()));
let daemon = VhostUserDaemon::new(
String::from("vhost-user-vsock"),
backend.clone(),
GuestMemoryAtomic::new(GuestMemoryMmap::new()),
)
.unwrap();
let vring_workers = daemon.get_epoll_handlers();
// VhostUserVsockBackend support a single thread that handles the TX and RX queues
assert_eq!(backend.read().unwrap().threads.len(), 1);
assert_eq!(vring_workers.len(), backend.read().unwrap().threads.len());
}
}

36
vsock/src/rxops.rs Normal file
View File

@ -0,0 +1,36 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub(crate) enum RxOps {
/// VSOCK_OP_REQUEST
Request = 0,
/// VSOCK_OP_RW
Rw = 1,
/// VSOCK_OP_RESPONSE
Response = 2,
/// VSOCK_OP_CREDIT_UPDATE
CreditUpdate = 3,
/// VSOCK_OP_RST
Reset = 4,
}
impl RxOps {
/// Convert enum value into bitmask.
pub fn bitmask(self) -> u8 {
1u8 << (self as u8)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bitmask() {
assert_eq!(1, RxOps::Request.bitmask());
assert_eq!(2, RxOps::Rw.bitmask());
assert_eq!(4, RxOps::Response.bitmask());
assert_eq!(8, RxOps::CreditUpdate.bitmask());
assert_eq!(16, RxOps::Reset.bitmask());
}
}

157
vsock/src/rxqueue.rs Normal file
View File

@ -0,0 +1,157 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use crate::rxops::RxOps;
#[derive(Debug, Eq, PartialEq)]
pub(crate) struct RxQueue {
/// Bitmap of rx operations.
queue: u8,
}
impl RxQueue {
/// New instance of RxQueue.
pub fn new() -> Self {
RxQueue { queue: 0_u8 }
}
/// Enqueue a new rx operation into the queue.
pub fn enqueue(&mut self, op: RxOps) {
self.queue |= op.bitmask();
}
/// Dequeue an rx operation from the queue.
pub fn dequeue(&mut self) -> Option<RxOps> {
match self.peek() {
Some(req) => {
self.queue &= !req.bitmask();
Some(req)
}
None => None,
}
}
/// Peek into the queue to check if it contains an rx operation.
pub fn peek(&self) -> Option<RxOps> {
if self.contains(RxOps::Request.bitmask()) {
return Some(RxOps::Request);
}
if self.contains(RxOps::Rw.bitmask()) {
return Some(RxOps::Rw);
}
if self.contains(RxOps::Response.bitmask()) {
return Some(RxOps::Response);
}
if self.contains(RxOps::CreditUpdate.bitmask()) {
return Some(RxOps::CreditUpdate);
}
if self.contains(RxOps::Reset.bitmask()) {
Some(RxOps::Reset)
} else {
None
}
}
/// Check if the queue contains a particular rx operation.
pub fn contains(&self, op: u8) -> bool {
(self.queue & op) != 0
}
/// Check if there are any pending rx operations in the queue.
pub fn pending_rx(&self) -> bool {
self.queue != 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_contains() {
let mut rxqueue = RxQueue::new();
rxqueue.queue = 31;
assert!(rxqueue.contains(RxOps::Request.bitmask()));
assert!(rxqueue.contains(RxOps::Rw.bitmask()));
assert!(rxqueue.contains(RxOps::Response.bitmask()));
assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask()));
assert!(rxqueue.contains(RxOps::Reset.bitmask()));
rxqueue.queue = 0;
assert!(!rxqueue.contains(RxOps::Request.bitmask()));
assert!(!rxqueue.contains(RxOps::Rw.bitmask()));
assert!(!rxqueue.contains(RxOps::Response.bitmask()));
assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask()));
assert!(!rxqueue.contains(RxOps::Reset.bitmask()));
}
#[test]
fn test_enqueue() {
let mut rxqueue = RxQueue::new();
rxqueue.enqueue(RxOps::Request);
assert!(rxqueue.contains(RxOps::Request.bitmask()));
rxqueue.enqueue(RxOps::Rw);
assert!(rxqueue.contains(RxOps::Rw.bitmask()));
rxqueue.enqueue(RxOps::Response);
assert!(rxqueue.contains(RxOps::Response.bitmask()));
rxqueue.enqueue(RxOps::CreditUpdate);
assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask()));
rxqueue.enqueue(RxOps::Reset);
assert!(rxqueue.contains(RxOps::Reset.bitmask()));
}
#[test]
fn test_peek() {
let mut rxqueue = RxQueue::new();
rxqueue.queue = 31;
assert_eq!(rxqueue.peek(), Some(RxOps::Request));
rxqueue.queue = 30;
assert_eq!(rxqueue.peek(), Some(RxOps::Rw));
rxqueue.queue = 28;
assert_eq!(rxqueue.peek(), Some(RxOps::Response));
rxqueue.queue = 24;
assert_eq!(rxqueue.peek(), Some(RxOps::CreditUpdate));
rxqueue.queue = 16;
assert_eq!(rxqueue.peek(), Some(RxOps::Reset));
}
#[test]
fn test_dequeue() {
let mut rxqueue = RxQueue::new();
rxqueue.queue = 31;
assert_eq!(rxqueue.dequeue(), Some(RxOps::Request));
assert!(!rxqueue.contains(RxOps::Request.bitmask()));
assert_eq!(rxqueue.dequeue(), Some(RxOps::Rw));
assert!(!rxqueue.contains(RxOps::Rw.bitmask()));
assert_eq!(rxqueue.dequeue(), Some(RxOps::Response));
assert!(!rxqueue.contains(RxOps::Response.bitmask()));
assert_eq!(rxqueue.dequeue(), Some(RxOps::CreditUpdate));
assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask()));
assert_eq!(rxqueue.dequeue(), Some(RxOps::Reset));
assert!(!rxqueue.contains(RxOps::Reset.bitmask()));
}
#[test]
fn test_pending_rx() {
let mut rxqueue = RxQueue::new();
assert!(!rxqueue.pending_rx());
rxqueue.queue = 1;
assert!(rxqueue.pending_rx());
}
}

300
vsock/src/thread_backend.rs Normal file
View File

@ -0,0 +1,300 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
collections::{HashMap, HashSet, VecDeque},
os::unix::{
net::UnixStream,
prelude::{AsRawFd, FromRawFd, RawFd},
},
};
use log::{info, warn};
use virtio_vsock::packet::VsockPacket;
use vm_memory::bitmap::BitmapSlice;
use crate::{
rxops::*,
vhu_vsock::{
ConnMapKey, Error, Result, VSOCK_HOST_CID, VSOCK_OP_REQUEST, VSOCK_OP_RST,
VSOCK_TYPE_STREAM,
},
vhu_vsock_thread::VhostUserVsockThread,
vsock_conn::*,
};
pub(crate) struct VsockThreadBackend {
/// Map of ConnMapKey objects indexed by raw file descriptors.
pub listener_map: HashMap<RawFd, ConnMapKey>,
/// Map of vsock connection objects indexed by ConnMapKey objects.
pub conn_map: HashMap<ConnMapKey, VsockConnection<UnixStream>>,
/// Queue of ConnMapKey objects indicating pending rx operations.
pub backend_rxq: VecDeque<ConnMapKey>,
/// Map of host-side unix streams indexed by raw file descriptors.
pub stream_map: HashMap<i32, UnixStream>,
/// Host side socket for listening to new connections from the host.
host_socket_path: String,
/// epoll for registering new host-side connections.
epoll_fd: i32,
/// Set of allocated local ports.
pub local_port_set: HashSet<u32>,
}
impl VsockThreadBackend {
/// New instance of VsockThreadBackend.
pub fn new(host_socket_path: String, epoll_fd: i32) -> Self {
Self {
listener_map: HashMap::new(),
conn_map: HashMap::new(),
backend_rxq: VecDeque::new(),
// Need this map to prevent connected stream from closing
// TODO: think of a better solution
stream_map: HashMap::new(),
host_socket_path,
epoll_fd,
local_port_set: HashSet::new(),
}
}
/// Checks if there are pending rx requests in the backend rxq.
pub fn pending_rx(&self) -> bool {
!self.backend_rxq.is_empty()
}
/// Deliver a vsock packet to the guest vsock driver.
///
/// Returns:
/// - `Ok(())` if the packet was successfully filled in
/// - `Err(Error::EmptyBackendRxQ) if there was no available data
pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
// Pop an event from the backend_rxq
let key = self.backend_rxq.pop_front().ok_or(Error::EmptyBackendRxQ)?;
let conn = match self.conn_map.get_mut(&key) {
Some(conn) => conn,
None => {
// assume that the connection does not exist
return Ok(());
}
};
if conn.rx_queue.peek() == Some(RxOps::Reset) {
// Handle RST events here
let conn = self.conn_map.remove(&key).unwrap();
self.listener_map.remove(&conn.stream.as_raw_fd());
self.stream_map.remove(&conn.stream.as_raw_fd());
self.local_port_set.remove(&conn.local_port);
VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd())
.unwrap_or_else(|err| {
warn!(
"Could not remove epoll listener for fd {:?}: {:?}",
conn.stream.as_raw_fd(),
err
)
});
// Initialize the packet header to contain a VSOCK_OP_RST operation
pkt.set_op(VSOCK_OP_RST)
.set_src_cid(VSOCK_HOST_CID)
.set_dst_cid(conn.guest_cid)
.set_src_port(conn.local_port)
.set_dst_port(conn.peer_port)
.set_len(0)
.set_type(VSOCK_TYPE_STREAM)
.set_flags(0)
.set_buf_alloc(0)
.set_fwd_cnt(0);
return Ok(());
}
// Handle other packet types per connection
conn.recv_pkt(pkt)?;
Ok(())
}
/// Deliver a guest generated packet to its destination in the backend.
///
/// Absorbs unexpected packets, handles rest to respective connection
/// object.
///
/// Returns:
/// - always `Ok(())` if packet has been consumed correctly
pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
let key = ConnMapKey::new(pkt.dst_port(), pkt.src_port());
// TODO: Rst if packet has unsupported type
if pkt.type_() != VSOCK_TYPE_STREAM {
info!("vsock: dropping packet of unknown type");
return Ok(());
}
// TODO: Handle packets to other CIDs as well
if pkt.dst_cid() != VSOCK_HOST_CID {
info!(
"vsock: dropping packet for cid other than host: {:?}",
pkt.dst_cid()
);
return Ok(());
}
// TODO: Handle cases where connection does not exist and packet op
// is not VSOCK_OP_REQUEST
if !self.conn_map.contains_key(&key) {
// The packet contains a new connection request
if pkt.op() == VSOCK_OP_REQUEST {
self.handle_new_guest_conn(pkt);
} else {
// TODO: send back RST
}
return Ok(());
}
if pkt.op() == VSOCK_OP_RST {
// Handle an RST packet from the guest here
let conn = self.conn_map.get(&key).unwrap();
if conn.rx_queue.contains(RxOps::Reset.bitmask()) {
return Ok(());
}
let conn = self.conn_map.remove(&key).unwrap();
self.listener_map.remove(&conn.stream.as_raw_fd());
self.stream_map.remove(&conn.stream.as_raw_fd());
self.local_port_set.remove(&conn.local_port);
VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd())
.unwrap_or_else(|err| {
warn!(
"Could not remove epoll listener for fd {:?}: {:?}",
conn.stream.as_raw_fd(),
err
)
});
return Ok(());
}
// Forward this packet to its listening connection
let conn = self.conn_map.get_mut(&key).unwrap();
conn.send_pkt(pkt)?;
if conn.rx_queue.pending_rx() {
// Required if the connection object adds new rx operations
self.backend_rxq.push_back(key);
}
Ok(())
}
/// Handle a new guest initiated connection, i.e from the peer, the guest driver.
///
/// Attempts to connect to a host side unix socket listening on a path
/// corresponding to the destination port as follows:
/// - "{self.host_sock_path}_{local_port}""
fn handle_new_guest_conn<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) {
let port_path = format!("{}_{}", self.host_socket_path, pkt.dst_port());
UnixStream::connect(port_path)
.and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
.map_err(Error::UnixConnect)
.and_then(|stream| self.add_new_guest_conn(stream, pkt))
.unwrap_or_else(|_| self.enq_rst());
}
/// Wrapper to add new connection to relevant HashMaps.
fn add_new_guest_conn<B: BitmapSlice>(
&mut self,
stream: UnixStream,
pkt: &VsockPacket<B>,
) -> Result<()> {
let stream_fd = stream.as_raw_fd();
self.listener_map
.insert(stream_fd, ConnMapKey::new(pkt.dst_port(), pkt.src_port()));
let conn = VsockConnection::new_peer_init(
stream,
pkt.dst_cid(),
pkt.dst_port(),
pkt.src_cid(),
pkt.src_port(),
self.epoll_fd,
pkt.buf_alloc(),
);
self.conn_map
.insert(ConnMapKey::new(pkt.dst_port(), pkt.src_port()), conn);
self.backend_rxq
.push_back(ConnMapKey::new(pkt.dst_port(), pkt.src_port()));
self.stream_map
.insert(stream_fd, unsafe { UnixStream::from_raw_fd(stream_fd) });
self.local_port_set.insert(pkt.dst_port());
VhostUserVsockThread::epoll_register(
self.epoll_fd,
stream_fd,
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)?;
Ok(())
}
/// Enqueue RST packets to be sent to guest.
fn enq_rst(&mut self) {
// TODO
dbg!("New guest conn error: Enqueue RST");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::vhu_vsock::VSOCK_OP_RW;
use std::os::unix::net::UnixListener;
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
const DATA_LEN: usize = 16;
#[test]
fn test_vsock_thread_backend() {
const VSOCK_SOCKET_PATH: &str = "test_vsock_thread_backend.vsock";
const VSOCK_PEER_PORT: u32 = 1234;
const VSOCK_PEER_PATH: &str = "test_vsock_thread_backend.vsock_1234";
let _ = std::fs::remove_file(VSOCK_PEER_PATH);
let _listener = UnixListener::bind(VSOCK_PEER_PATH).unwrap();
let epoll_fd = epoll::create(false).unwrap();
let mut vtp = VsockThreadBackend::new(VSOCK_SOCKET_PATH.to_string(), epoll_fd);
assert!(!vtp.pending_rx());
let mut pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN];
let (hdr_raw, data_raw) = pkt_raw.split_at_mut(PKT_HEADER_SIZE);
let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() };
assert_eq!(
vtp.recv_pkt(&mut packet).unwrap_err().to_string(),
Error::EmptyBackendRxQ.to_string()
);
assert!(vtp.send_pkt(&packet).is_ok());
packet.set_type(VSOCK_TYPE_STREAM);
assert!(vtp.send_pkt(&packet).is_ok());
packet.set_dst_cid(VSOCK_HOST_CID);
packet.set_dst_port(VSOCK_PEER_PORT);
assert!(vtp.send_pkt(&packet).is_ok());
packet.set_op(VSOCK_OP_REQUEST);
assert!(vtp.send_pkt(&packet).is_ok());
packet.set_op(VSOCK_OP_RW);
assert!(vtp.send_pkt(&packet).is_ok());
packet.set_op(VSOCK_OP_RST);
assert!(vtp.send_pkt(&packet).is_ok());
assert!(vtp.recv_pkt(&mut packet).is_ok());
// cleanup
let _ = std::fs::remove_file(VSOCK_PEER_PATH);
}
}

222
vsock/src/txbuf.rs Normal file
View File

@ -0,0 +1,222 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{io::Write, num::Wrapping};
use vm_memory::{bitmap::BitmapSlice, VolatileSlice};
use crate::vhu_vsock::{Error, Result, CONN_TX_BUF_SIZE};
#[derive(Debug)]
pub(crate) struct LocalTxBuf {
/// Buffer holding data to be forwarded to a host-side application
buf: Vec<u8>,
/// Index into buffer from which data can be consumed from the buffer
head: Wrapping<u32>,
/// Index into buffer from which data can be added to the buffer
tail: Wrapping<u32>,
}
impl LocalTxBuf {
/// Create a new instance of LocalTxBuf.
pub fn new() -> Self {
Self {
buf: vec![0; CONN_TX_BUF_SIZE as usize],
head: Wrapping(0),
tail: Wrapping(0),
}
}
/// Check if the buf is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Add new data to the tx buffer, push all or none.
/// Returns LocalTxBufFull error if space not sufficient.
pub fn push<B: BitmapSlice>(&mut self, data_buf: &VolatileSlice<B>) -> Result<()> {
if CONN_TX_BUF_SIZE as usize - self.len() < data_buf.len() {
// Tx buffer is full
return Err(Error::LocalTxBufFull);
}
// Get index into buffer at which data can be inserted
let tail_idx = self.tail.0 as usize % CONN_TX_BUF_SIZE as usize;
// Check if we can fit the data buffer between head and end of buffer
let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - tail_idx, data_buf.len());
let txbuf = &mut self.buf[tail_idx..tail_idx + len];
data_buf.copy_to(txbuf);
// Check if there is more data to be wrapped around
if len < data_buf.len() {
let remain_txbuf = &mut self.buf[..(data_buf.len() - len)];
data_buf.copy_to(remain_txbuf);
}
// Increment tail by the amount of data that has been added to the buffer
self.tail += Wrapping(data_buf.len() as u32);
Ok(())
}
/// Flush buf data to stream.
pub fn flush_to<S: Write>(&mut self, stream: &mut S) -> Result<usize> {
if self.is_empty() {
// No data to be flushed
return Ok(0);
}
// Get index into buffer from which data can be read
let head_idx = self.head.0 as usize % CONN_TX_BUF_SIZE as usize;
// First write from head to end of buffer
let len = std::cmp::min(CONN_TX_BUF_SIZE as usize - head_idx, self.len());
let written = stream
.write(&self.buf[head_idx..(head_idx + len)])
.map_err(Error::LocalTxBufFlush)?;
// Increment head by amount of data that has been flushed to the stream
self.head += Wrapping(written as u32);
// If written length is less than the expected length we can try again in the future
if written < len {
return Ok(written);
}
// The head index has wrapped around the end of the buffer, we call self again
Ok(written + self.flush_to(stream).unwrap_or(0))
}
/// Return amount of data in the buffer.
fn len(&self) -> usize {
(self.tail - self.head).0 as usize
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_txbuf_len() {
let mut loc_tx_buf = LocalTxBuf::new();
// Zero length tx buf
assert_eq!(loc_tx_buf.len(), 0);
// finite length tx buf
loc_tx_buf.head = Wrapping(0);
loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
assert_eq!(loc_tx_buf.len(), CONN_TX_BUF_SIZE as usize);
loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE / 2);
assert_eq!(loc_tx_buf.len(), (CONN_TX_BUF_SIZE / 2) as usize);
loc_tx_buf.head = Wrapping(256);
assert_eq!(loc_tx_buf.len(), 32512);
}
#[test]
fn test_txbuf_is_empty() {
let mut loc_tx_buf = LocalTxBuf::new();
// empty tx buffer
assert!(loc_tx_buf.is_empty());
// non empty tx buffer
loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
assert!(!loc_tx_buf.is_empty());
}
#[test]
fn test_txbuf_push() {
let mut loc_tx_buf = LocalTxBuf::new();
let mut buf = [0; CONN_TX_BUF_SIZE as usize];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
// push data into empty tx buffer
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_ok());
assert_eq!(loc_tx_buf.head, Wrapping(0));
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE));
// push data into full tx buffer
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_err());
// head and tail wrap at full
loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE);
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_ok());
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2));
// only tail wraps at full
let mut buf = vec![1; 4];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
let mut cmp_data = vec![1; 4];
cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]);
loc_tx_buf.head = Wrapping(4);
loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_ok());
assert_eq!(loc_tx_buf.head, Wrapping(4));
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE + 4));
assert_eq!(loc_tx_buf.buf, cmp_data);
}
#[test]
fn test_txbuf_flush_to() {
let mut loc_tx_buf = LocalTxBuf::new();
// data to be flushed
let mut buf = vec![1; CONN_TX_BUF_SIZE as usize];
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
// target to which data is flushed
let mut cmp_vec = Vec::with_capacity(data.len());
// flush no data
let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
assert!(res_flush.is_ok());
assert_eq!(res_flush.unwrap(), 0);
// flush data of CONN_TX_BUF_SIZE amount
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_ok());
let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
if let Ok(n) = res_flush {
assert_eq!(loc_tx_buf.head, Wrapping(n as u32));
assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE));
assert_eq!(n, cmp_vec.len());
assert_eq!(cmp_vec, buf[..n]);
}
// wrapping head flush
let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize];
buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]);
let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
loc_tx_buf.head = Wrapping(0);
loc_tx_buf.tail = Wrapping(0);
let res_push = loc_tx_buf.push(&data);
assert!(res_push.is_ok());
cmp_vec.clear();
loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE / 2);
loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2));
let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
if let Ok(n) = res_flush {
assert_eq!(
loc_tx_buf.head,
Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2))
);
assert_eq!(
loc_tx_buf.tail,
Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2))
);
assert_eq!(n, cmp_vec.len());
let mut data = vec![1; (CONN_TX_BUF_SIZE / 2) as usize];
data.append(&mut vec![0; (CONN_TX_BUF_SIZE / 2) as usize]);
assert_eq!(cmp_vec, data[..n]);
}
}
}

449
vsock/src/vhu_vsock.rs Normal file
View File

@ -0,0 +1,449 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{io, result, sync::Mutex, u16, u32, u64, u8};
use thiserror::Error as ThisError;
use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
use vhost_user_backend::{VhostUserBackendMut, VringRwLock};
use virtio_bindings::bindings::{
virtio_net::VIRTIO_F_NOTIFY_ON_EMPTY, virtio_net::VIRTIO_F_VERSION_1,
virtio_ring::VIRTIO_RING_F_EVENT_IDX,
};
use vm_memory::{ByteValued, GuestMemoryAtomic, GuestMemoryMmap, Le64};
use vmm_sys_util::{
epoll::EventSet,
eventfd::{EventFd, EFD_NONBLOCK},
};
use crate::vhu_vsock_thread::*;
const NUM_QUEUES: usize = 2;
const QUEUE_SIZE: usize = 256;
// New descriptors pending on the rx queue
const RX_QUEUE_EVENT: u16 = 0;
// New descriptors are pending on the tx queue.
const TX_QUEUE_EVENT: u16 = 1;
// New descriptors are pending on the event queue.
const EVT_QUEUE_EVENT: u16 = 2;
/// Notification coming from the backend.
pub(crate) const BACKEND_EVENT: u16 = 3;
/// Vsock connection TX buffer capacity
/// TODO: Make this value configurable
pub(crate) const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
/// CID of the host
pub(crate) const VSOCK_HOST_CID: u64 = 2;
/// Connection oriented packet
pub(crate) const VSOCK_TYPE_STREAM: u16 = 1;
// Vsock packet operation ID
/// Connection request
pub(crate) const VSOCK_OP_REQUEST: u16 = 1;
/// Connection response
pub(crate) const VSOCK_OP_RESPONSE: u16 = 2;
/// Connection reset
pub(crate) const VSOCK_OP_RST: u16 = 3;
/// Shutdown connection
pub(crate) const VSOCK_OP_SHUTDOWN: u16 = 4;
/// Data read/write
pub(crate) const VSOCK_OP_RW: u16 = 5;
/// Flow control credit update
pub(crate) const VSOCK_OP_CREDIT_UPDATE: u16 = 6;
/// Flow control credit request
pub(crate) const VSOCK_OP_CREDIT_REQUEST: u16 = 7;
// Vsock packet flags
/// VSOCK_OP_SHUTDOWN: Packet sender will receive no more data
pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1;
/// VSOCK_OP_SHUTDOWN: Packet sender will send no more data
pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2;
// Queue mask to select vrings.
const QUEUE_MASK: u64 = 0b11;
pub(crate) type Result<T> = std::result::Result<T, Error>;
/// Below enum defines custom error types.
#[derive(Debug, ThisError)]
pub(crate) enum Error {
#[error("Failed to handle event other than EPOLLIN event")]
HandleEventNotEpollIn,
#[error("Failed to handle unknown event")]
HandleUnknownEvent,
#[error("Failed to accept new local socket connection")]
UnixAccept(std::io::Error),
#[error("Failed to bind a unix stream")]
UnixBind(std::io::Error),
#[error("Failed to create an epoll fd")]
EpollFdCreate(std::io::Error),
#[error("Failed to add to epoll")]
EpollAdd(std::io::Error),
#[error("Failed to modify evset associated with epoll")]
EpollModify(std::io::Error),
#[error("Failed to read from unix stream")]
UnixRead(std::io::Error),
#[error("Failed to convert byte array to string")]
ConvertFromUtf8(std::str::Utf8Error),
#[error("Invalid vsock connection request from host")]
InvalidPortRequest,
#[error("Unable to convert string to integer")]
ParseInteger(std::num::ParseIntError),
#[error("Error reading stream port")]
ReadStreamPort(Box<Error>),
#[error("Failed to de-register fd from epoll")]
EpollRemove(std::io::Error),
#[error("No memory configured")]
NoMemoryConfigured,
#[error("Unable to iterate queue")]
IterateQueue,
#[error("No rx request available")]
NoRequestRx,
#[error("Unable to create thread pool")]
CreateThreadPool(std::io::Error),
#[error("Packet missing data buffer")]
PktBufMissing,
#[error("Failed to connect to unix socket")]
UnixConnect(std::io::Error),
#[error("Unable to write to unix stream")]
UnixWrite,
#[error("Unable to push data to local tx buffer")]
LocalTxBufFull,
#[error("Unable to flush data from local tx buffer")]
LocalTxBufFlush(std::io::Error),
#[error("No free local port available for new host inititated connection")]
NoFreeLocalPort,
#[error("Backend rx queue is empty")]
EmptyBackendRxQ,
#[error("Failed to create an EventFd")]
EventFdCreate(std::io::Error),
}
impl std::convert::From<Error> for std::io::Error {
fn from(e: Error) -> Self {
std::io::Error::new(io::ErrorKind::Other, e)
}
}
#[derive(Debug, Clone)]
/// This structure is the public API through which an external program
/// is allowed to configure the backend.
pub(crate) struct VsockConfig {
guest_cid: u64,
socket: String,
uds_path: String,
}
impl VsockConfig {
/// Create a new instance of the VsockConfig struct, containing the
/// parameters to be fed into the vsock-backend server.
pub fn new(guest_cid: u64, socket: String, uds_path: String) -> Self {
Self {
guest_cid,
socket,
uds_path,
}
}
/// Return the guest's current CID.
pub fn get_guest_cid(&self) -> u64 {
self.guest_cid
}
/// Return the path of the unix domain socket which is listening to
/// requests from the host side application.
pub fn get_uds_path(&self) -> String {
String::from(&self.uds_path)
}
/// Return the path of the unix domain socket which is listening to
/// requests from the guest.
pub fn get_socket_path(&self) -> String {
String::from(&self.socket)
}
}
/// A local port and peer port pair used to retrieve
/// the corresponding connection.
#[derive(Hash, PartialEq, Eq, Debug, Clone)]
pub(crate) struct ConnMapKey {
local_port: u32,
peer_port: u32,
}
impl ConnMapKey {
pub fn new(local_port: u32, peer_port: u32) -> Self {
Self {
local_port,
peer_port,
}
}
}
/// Virtio Vsock Configuration
#[derive(Copy, Clone, Debug, Default, PartialEq)]
#[repr(C)]
struct VirtioVsockConfig {
pub guest_cid: Le64,
}
unsafe impl ByteValued for VirtioVsockConfig {}
pub(crate) struct VhostUserVsockBackend {
config: VirtioVsockConfig,
pub threads: Vec<Mutex<VhostUserVsockThread>>,
queues_per_thread: Vec<u64>,
pub exit_event: EventFd,
}
impl VhostUserVsockBackend {
pub fn new(config: VsockConfig) -> Result<Self> {
let thread = Mutex::new(VhostUserVsockThread::new(
config.get_uds_path(),
config.get_guest_cid(),
)?);
let queues_per_thread = vec![QUEUE_MASK];
Ok(Self {
config: VirtioVsockConfig {
guest_cid: From::from(config.get_guest_cid()),
},
threads: vec![thread],
queues_per_thread,
exit_event: EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?,
})
}
}
impl VhostUserBackendMut<VringRwLock, ()> for VhostUserVsockBackend {
fn num_queues(&self) -> usize {
NUM_QUEUES
}
fn max_queue_size(&self) -> usize {
QUEUE_SIZE
}
fn features(&self) -> u64 {
1 << VIRTIO_F_VERSION_1
| 1 << VIRTIO_F_NOTIFY_ON_EMPTY
| 1 << VIRTIO_RING_F_EVENT_IDX
| VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
VhostUserProtocolFeatures::CONFIG
}
fn set_event_idx(&mut self, enabled: bool) {
for thread in self.threads.iter() {
thread.lock().unwrap().event_idx = enabled;
}
}
fn update_memory(
&mut self,
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
) -> result::Result<(), io::Error> {
for thread in self.threads.iter() {
thread.lock().unwrap().mem = Some(atomic_mem.clone());
}
Ok(())
}
fn handle_event(
&mut self,
device_event: u16,
evset: EventSet,
vrings: &[VringRwLock],
thread_id: usize,
) -> result::Result<bool, io::Error> {
let vring_rx = &vrings[0];
let vring_tx = &vrings[1];
if evset != EventSet::IN {
return Err(Error::HandleEventNotEpollIn.into());
}
let mut thread = self.threads[thread_id].lock().unwrap();
let evt_idx = thread.event_idx;
match device_event {
RX_QUEUE_EVENT => {}
TX_QUEUE_EVENT => {
thread.process_tx(vring_tx, evt_idx)?;
}
EVT_QUEUE_EVENT => {}
BACKEND_EVENT => {
thread.process_backend_evt(evset);
thread.process_tx(vring_tx, evt_idx)?;
}
_ => {
return Err(Error::HandleUnknownEvent.into());
}
}
if device_event != EVT_QUEUE_EVENT && thread.thread_backend.pending_rx() {
thread.process_rx(vring_rx, evt_idx)?;
}
Ok(false)
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
let offset = offset as usize;
let size = size as usize;
let buf = self.config.as_slice();
if offset + size > buf.len() {
return Vec::new();
}
buf[offset..offset + size].to_vec()
}
fn queues_per_thread(&self) -> Vec<u64> {
self.queues_per_thread.clone()
}
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
self.exit_event.try_clone().ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::convert::TryInto;
use vhost_user_backend::VringT;
use vm_memory::GuestAddress;
#[test]
fn test_vsock_backend() {
const CID: u64 = 3;
const VHOST_SOCKET_PATH: &str = "test_vsock_backend.socket";
const VSOCK_SOCKET_PATH: &str = "test_vsock_backend.vsock";
let config = VsockConfig::new(
CID,
VHOST_SOCKET_PATH.to_string(),
VSOCK_SOCKET_PATH.to_string(),
);
let backend = VhostUserVsockBackend::new(config);
assert!(backend.is_ok());
let mut backend = backend.unwrap();
assert_eq!(backend.num_queues(), NUM_QUEUES);
assert_eq!(backend.max_queue_size(), QUEUE_SIZE);
assert_ne!(backend.features(), 0);
assert!(!backend.protocol_features().is_empty());
backend.set_event_idx(false);
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
let vrings = [
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
VringRwLock::new(mem.clone(), 0x2000).unwrap(),
];
assert!(backend.update_memory(mem).is_ok());
let queues_per_thread = backend.queues_per_thread();
assert_eq!(queues_per_thread.len(), 1);
assert_eq!(queues_per_thread[0], 0b11);
let config = backend.get_config(0, 8);
assert_eq!(config.len(), 8);
let cid = u64::from_le_bytes(config.try_into().unwrap());
assert_eq!(cid, CID);
let exit = backend.exit_event(0);
assert!(exit.is_some());
exit.unwrap().write(1).unwrap();
let ret = backend.handle_event(RX_QUEUE_EVENT, EventSet::IN, &vrings, 0);
assert!(ret.is_ok());
assert!(!ret.unwrap());
let ret = backend.handle_event(TX_QUEUE_EVENT, EventSet::IN, &vrings, 0);
assert!(ret.is_ok());
assert!(!ret.unwrap());
let ret = backend.handle_event(EVT_QUEUE_EVENT, EventSet::IN, &vrings, 0);
assert!(ret.is_ok());
assert!(!ret.unwrap());
let ret = backend.handle_event(BACKEND_EVENT, EventSet::IN, &vrings, 0);
assert!(ret.is_ok());
assert!(!ret.unwrap());
// cleanup
let _ = std::fs::remove_file(VHOST_SOCKET_PATH);
let _ = std::fs::remove_file(VSOCK_SOCKET_PATH);
}
#[test]
fn test_vsock_backend_failures() {
const CID: u64 = 3;
const VHOST_SOCKET_PATH: &str = "test_vsock_backend_failures.socket";
const VSOCK_SOCKET_PATH: &str = "test_vsock_backend_failures.vsock";
let config = VsockConfig::new(
CID,
"/sys/not_allowed.socket".to_string(),
"/sys/not_allowed.vsock".to_string(),
);
let backend = VhostUserVsockBackend::new(config);
assert!(backend.is_err());
let config = VsockConfig::new(
CID,
VHOST_SOCKET_PATH.to_string(),
VSOCK_SOCKET_PATH.to_string(),
);
let mut backend = VhostUserVsockBackend::new(config).unwrap();
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
let vrings = [
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
VringRwLock::new(mem.clone(), 0x2000).unwrap(),
];
backend.update_memory(mem).unwrap();
// reading out of the config space, expecting empty config
let config = backend.get_config(2, 8);
assert_eq!(config.len(), 0);
assert_eq!(
backend
.handle_event(RX_QUEUE_EVENT, EventSet::OUT, &vrings, 0)
.unwrap_err()
.to_string(),
Error::HandleEventNotEpollIn.to_string()
);
assert_eq!(
backend
.handle_event(BACKEND_EVENT + 1, EventSet::IN, &vrings, 0)
.unwrap_err()
.to_string(),
Error::HandleUnknownEvent.to_string()
);
// cleanup
let _ = std::fs::remove_file(VHOST_SOCKET_PATH);
let _ = std::fs::remove_file(VSOCK_SOCKET_PATH);
}
}

View File

@ -0,0 +1,675 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
fs::File,
io,
io::Read,
num::Wrapping,
ops::Deref,
os::unix::{
net::{UnixListener, UnixStream},
prelude::{AsRawFd, FromRawFd, RawFd},
},
sync::{Arc, RwLock},
};
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use log::warn;
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
use virtio_queue::QueueOwnedT;
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::epoll::EventSet;
use crate::{
rxops::*,
thread_backend::*,
vhu_vsock::{
ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, CONN_TX_BUF_SIZE,
VSOCK_HOST_CID,
},
vsock_conn::*,
};
type ArcVhostBknd = Arc<RwLock<VhostUserVsockBackend>>;
pub(crate) struct VhostUserVsockThread {
/// Guest memory map.
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
/// VIRTIO_RING_F_EVENT_IDX.
pub event_idx: bool,
/// Host socket raw file descriptor.
host_sock: RawFd,
/// Host socket path
host_sock_path: String,
/// Listener listening for new connections on the host.
host_listener: UnixListener,
/// Instance of VringWorker.
vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>,
/// epoll fd to which new host connections are added.
epoll_file: File,
/// VsockThreadBackend instance.
pub thread_backend: VsockThreadBackend,
/// CID of the guest.
guest_cid: u64,
/// Thread pool to handle event idx.
pool: ThreadPool,
/// host side port on which application listens.
local_port: Wrapping<u32>,
}
impl VhostUserVsockThread {
/// Create a new instance of VhostUserVsockThread.
pub fn new(uds_path: String, guest_cid: u64) -> Result<Self> {
// TODO: better error handling, maybe add a param to force the unlink
let _ = std::fs::remove_file(uds_path.clone());
let host_sock = UnixListener::bind(&uds_path)
.and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
.map_err(Error::UnixBind)?;
let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
let host_raw_fd = host_sock.as_raw_fd();
let thread = VhostUserVsockThread {
mem: None,
event_idx: false,
host_sock: host_sock.as_raw_fd(),
host_sock_path: uds_path.clone(),
host_listener: host_sock,
vring_worker: None,
epoll_file,
thread_backend: VsockThreadBackend::new(uds_path, epoll_fd),
guest_cid,
pool: ThreadPoolBuilder::new()
.pool_size(1)
.create()
.map_err(Error::CreateThreadPool)?,
local_port: Wrapping(0),
};
VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?;
Ok(thread)
}
/// Register a file with an epoll to listen for events in evset.
pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_ADD,
fd,
epoll::Event::new(evset, fd as u64),
)
.map_err(Error::EpollAdd)?;
Ok(())
}
/// Remove a file from the epoll.
pub fn epoll_unregister(epoll_fd: RawFd, fd: RawFd) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_DEL,
fd,
epoll::Event::new(epoll::Events::empty(), 0),
)
.map_err(Error::EpollRemove)?;
Ok(())
}
/// Modify the events we listen to for the fd in the epoll.
pub fn epoll_modify(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_MOD,
fd,
epoll::Event::new(evset, fd as u64),
)
.map_err(Error::EpollModify)?;
Ok(())
}
/// Return raw file descriptor of the epoll file.
fn get_epoll_fd(&self) -> RawFd {
self.epoll_file.as_raw_fd()
}
/// Set self's VringWorker.
pub fn set_vring_worker(
&mut self,
vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>,
) {
self.vring_worker = vring_worker;
self.vring_worker
.as_ref()
.unwrap()
.register_listener(self.get_epoll_fd(), EventSet::IN, u64::from(BACKEND_EVENT))
.unwrap();
}
/// Process a BACKEND_EVENT received by VhostUserVsockBackend.
pub fn process_backend_evt(&mut self, _evset: EventSet) {
let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
'epoll: loop {
match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for evt in epoll_events.iter().take(ev_cnt) {
self.handle_event(
evt.data as RawFd,
epoll::Events::from_bits(evt.events).unwrap(),
);
}
}
Err(e) => {
if e.kind() == io::ErrorKind::Interrupted {
continue;
}
warn!("failed to consume new epoll event");
}
}
break 'epoll;
}
}
/// Handle a BACKEND_EVENT by either accepting a new connection or
/// forwarding a request to the appropriate connection object.
fn handle_event(&mut self, fd: RawFd, evset: epoll::Events) {
if fd == self.host_sock {
// This is a new connection initiated by an application running on the host
self.host_listener
.accept()
.map_err(Error::UnixAccept)
.and_then(|(stream, _)| {
stream
.set_nonblocking(true)
.map(|_| stream)
.map_err(Error::UnixAccept)
})
.and_then(|stream| self.add_stream_listener(stream))
.unwrap_or_else(|err| {
warn!("Unable to accept new local connection: {:?}", err);
});
} else {
// Check if the stream represented by fd has already established a
// connection with the application running in the guest
if let std::collections::hash_map::Entry::Vacant(_) =
self.thread_backend.listener_map.entry(fd)
{
// New connection from the host
if evset != epoll::Events::EPOLLIN {
// Has to be EPOLLIN as it was not connected previously
return;
}
let mut unix_stream = match self.thread_backend.stream_map.remove(&fd) {
Some(uds) => uds,
None => {
warn!("Error while searching fd in the stream map");
return;
}
};
// Local peer is sending a "connect PORT\n" command
let peer_port = match Self::read_local_stream_port(&mut unix_stream) {
Ok(port) => port,
Err(err) => {
warn!("Error while parsing \"connect PORT\n\" command: {:?}", err);
return;
}
};
// Allocate a local port number
let local_port = match self.allocate_local_port() {
Ok(lp) => lp,
Err(err) => {
warn!("Error while allocating local port: {:?}", err);
return;
}
};
// Insert the fd into the backend's maps
self.thread_backend
.listener_map
.insert(fd, ConnMapKey::new(local_port, peer_port));
// Create a new connection object an enqueue a connection request
// packet to be sent to the guest
let conn_map_key = ConnMapKey::new(local_port, peer_port);
let mut new_conn = VsockConnection::new_local_init(
unix_stream,
VSOCK_HOST_CID,
local_port,
self.guest_cid,
peer_port,
self.get_epoll_fd(),
);
new_conn.rx_queue.enqueue(RxOps::Request);
new_conn.set_peer_port(peer_port);
// Add connection object into the backend's maps
self.thread_backend.conn_map.insert(conn_map_key, new_conn);
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(local_port, peer_port));
// Re-register the fd to listen for EPOLLIN and EPOLLOUT events
Self::epoll_modify(
self.get_epoll_fd(),
fd,
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.unwrap();
} else {
// Previously connected connection
let key = self.thread_backend.listener_map.get(&fd).unwrap();
let conn = self.thread_backend.conn_map.get_mut(key).unwrap();
if evset == epoll::Events::EPOLLOUT {
// Flush any remaining data from the tx buffer
match conn.tx_buf.flush_to(&mut conn.stream) {
Ok(cnt) => {
if cnt > 0 {
conn.fwd_cnt += Wrapping(cnt as u32);
conn.rx_queue.enqueue(RxOps::CreditUpdate);
}
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
}
Err(e) => {
dbg!("Error: {:?}", e);
}
}
return;
}
// Unregister stream from the epoll, register when connection is
// established with the guest
Self::epoll_unregister(self.epoll_file.as_raw_fd(), fd).unwrap();
// Enqueue a read request
conn.rx_queue.enqueue(RxOps::Rw);
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
}
}
}
/// Allocate a new local port number.
fn allocate_local_port(&mut self) -> Result<u32> {
// TODO: Improve space efficiency of this operation
// TODO: Reuse the conn_map HashMap
// TODO: Test this.
let mut alloc_local_port = self.local_port.0;
loop {
if !self
.thread_backend
.local_port_set
.contains(&alloc_local_port)
{
// The port set doesn't contain the newly allocated port number.
self.local_port = Wrapping(alloc_local_port + 1);
self.thread_backend.local_port_set.insert(alloc_local_port);
return Ok(alloc_local_port);
} else {
if alloc_local_port == self.local_port.0 {
// We have exhausted our search and wrapped back to the current port number
return Err(Error::NoFreeLocalPort);
}
alloc_local_port += 1;
}
}
}
/// Read `CONNECT PORT_NUM\n` from the connected stream.
fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
let mut buf = [0u8; 32];
// Minimum number of bytes we should be able to read
// Corresponds to 'CONNECT 0\n'
const MIN_READ_LEN: usize = 10;
// Read in the minimum number of bytes we can read
stream
.read_exact(&mut buf[..MIN_READ_LEN])
.map_err(Error::UnixRead)?;
let mut read_len = MIN_READ_LEN;
while buf[read_len - 1] != b'\n' && read_len < buf.len() {
stream
.read_exact(&mut buf[read_len..read_len + 1])
.map_err(Error::UnixRead)?;
read_len += 1;
}
let mut word_iter = std::str::from_utf8(&buf[..read_len])
.map_err(Error::ConvertFromUtf8)?
.split_whitespace();
word_iter
.next()
.ok_or(Error::InvalidPortRequest)
.and_then(|word| {
if word.to_lowercase() == "connect" {
Ok(())
} else {
Err(Error::InvalidPortRequest)
}
})
.and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
.and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
.map_err(|e| Error::ReadStreamPort(Box::new(e)))
}
/// Add a stream to epoll to listen for EPOLLIN events.
fn add_stream_listener(&mut self, stream: UnixStream) -> Result<()> {
let stream_fd = stream.as_raw_fd();
self.thread_backend.stream_map.insert(stream_fd, stream);
VhostUserVsockThread::epoll_register(
self.get_epoll_fd(),
stream_fd,
epoll::Events::EPOLLIN,
)?;
Ok(())
}
/// Iterate over the rx queue and process rx requests.
fn process_rx_queue(&mut self, vring: &VringRwLock) -> Result<bool> {
let mut used_any = false;
let atomic_mem = match &self.mem {
Some(m) => m,
None => return Err(Error::NoMemoryConfigured),
};
let mut vring_mut = vring.get_mut();
let queue = vring_mut.get_queue_mut();
while let Some(mut avail_desc) = queue
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let used_len = match VsockPacket::from_rx_virtq_chain(
mem.deref(),
&mut avail_desc,
CONN_TX_BUF_SIZE,
) {
Ok(mut pkt) => {
if self.thread_backend.recv_pkt(&mut pkt).is_ok() {
PKT_HEADER_SIZE + pkt.len() as usize
} else {
queue.iter(mem).unwrap().go_to_previous_position();
break;
}
}
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};
let vring = vring.clone();
let event_idx = self.event_idx;
self.pool.spawn_ok(async move {
// TODO: Understand why doing the following in the pool works
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
if !self.thread_backend.pending_rx() {
break;
}
}
Ok(used_any)
}
/// Wrapper to process rx queue based on whether event idx is enabled or not.
pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
// To properly handle EVENT_IDX we need to keep calling
// process_rx_queue until it stops finding new requests
// on the queue, as vm-virtio's Queue implementation
// only checks avail_index once
loop {
if !self.thread_backend.pending_rx() {
break;
}
vring.disable_notification().unwrap();
self.process_rx_queue(vring)?;
if !vring.enable_notification().unwrap() {
break;
}
}
} else {
self.process_rx_queue(vring)?;
}
Ok(false)
}
/// Process tx queue and send requests to the backend for processing.
fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result<bool> {
let mut used_any = false;
let atomic_mem = match &self.mem {
Some(m) => m,
None => return Err(Error::NoMemoryConfigured),
};
while let Some(mut avail_desc) = vring
.get_mut()
.get_queue_mut()
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let pkt = match VsockPacket::from_tx_virtq_chain(
mem.deref(),
&mut avail_desc,
CONN_TX_BUF_SIZE,
) {
Ok(pkt) => pkt,
Err(e) => {
dbg!("vsock: error reading TX packet: {:?}", e);
continue;
}
};
if self.thread_backend.send_pkt(&pkt).is_err() {
vring
.get_mut()
.get_queue_mut()
.iter(mem)
.unwrap()
.go_to_previous_position();
break;
}
// TODO: Check if the protocol requires read length to be correct
let used_len = 0;
let vring = vring.clone();
let event_idx = self.event_idx;
self.pool.spawn_ok(async move {
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
}
Ok(used_any)
}
/// Wrapper to process tx queue based on whether event idx is enabled or not.
pub fn process_tx(&mut self, vring_lock: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
// To properly handle EVENT_IDX we need to keep calling
// process_rx_queue until it stops finding new requests
// on the queue, as vm-virtio's Queue implementation
// only checks avail_index once
loop {
vring_lock.disable_notification().unwrap();
self.process_tx_queue(vring_lock)?;
if !vring_lock.enable_notification().unwrap() {
break;
}
}
} else {
self.process_tx_queue(vring_lock)?;
}
Ok(false)
}
}
impl Drop for VhostUserVsockThread {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.host_sock_path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use vm_memory::GuestAddress;
use vmm_sys_util::eventfd::EventFd;
impl VhostUserVsockThread {
fn get_epoll_file(&self) -> &File {
&self.epoll_file
}
}
#[test]
fn test_vsock_thread() {
let t = VhostUserVsockThread::new("test_vsock_thread.vsock".to_string(), 3);
assert!(t.is_ok());
let mut t = t.unwrap();
let epoll_fd = t.get_epoll_file().as_raw_fd();
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
t.mem = Some(mem.clone());
let dummy_fd = EventFd::new(0).unwrap();
assert!(VhostUserVsockThread::epoll_register(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLOUT
)
.is_ok());
assert!(VhostUserVsockThread::epoll_modify(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLIN
)
.is_ok());
assert!(VhostUserVsockThread::epoll_unregister(epoll_fd, dummy_fd.as_raw_fd()).is_ok());
assert!(VhostUserVsockThread::epoll_register(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLIN
)
.is_ok());
let vring = VringRwLock::new(mem, 0x1000).unwrap();
assert!(t.process_tx(&vring, false).is_ok());
assert!(t.process_tx(&vring, true).is_ok());
// add backend_rxq to avoid that RX processing is skipped
t.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(0, 0));
assert!(t.process_rx(&vring, false).is_ok());
assert!(t.process_rx(&vring, true).is_ok());
dummy_fd.write(1).unwrap();
t.process_backend_evt(EventSet::empty());
}
#[test]
fn test_vsock_thread_failures() {
let t = VhostUserVsockThread::new("/sys/not_allowed.vsock".to_string(), 3);
assert!(t.is_err());
let mut t =
VhostUserVsockThread::new("test_vsock_thread_failures.vsock".to_string(), 3).unwrap();
assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err());
assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err());
assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err());
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
let vring = VringRwLock::new(mem, 0x1000).unwrap();
// memory is not configured, so processing TX should fail
assert!(t.process_tx(&vring, false).is_err());
assert!(t.process_tx(&vring, true).is_err());
// add backend_rxq to avoid that RX processing is skipped
t.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(0, 0));
assert!(t.process_rx(&vring, false).is_err());
assert!(t.process_rx(&vring, true).is_err());
}
}

719
vsock/src/vsock_conn.rs Normal file
View File

@ -0,0 +1,719 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
io::{ErrorKind, Read, Write},
num::Wrapping,
os::unix::prelude::{AsRawFd, RawFd},
};
use log::info;
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice};
use crate::{
rxops::*,
rxqueue::*,
txbuf::*,
vhu_vsock::{
Error, Result, CONN_TX_BUF_SIZE, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND,
VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE,
VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM,
},
vhu_vsock_thread::VhostUserVsockThread,
};
#[derive(Debug)]
pub(crate) struct VsockConnection<S> {
/// Host-side stream corresponding to this vsock connection.
pub stream: S,
/// Specifies if the stream is connected to a listener on the host.
pub connect: bool,
/// Port at which a guest application is listening to.
pub peer_port: u32,
/// Queue holding pending rx operations per connection.
pub rx_queue: RxQueue,
/// CID of the host.
local_cid: u64,
/// Port on the host at which a host-side application listens to.
pub local_port: u32,
/// CID of the guest.
pub guest_cid: u64,
/// Total number of bytes written to stream from tx buffer.
pub fwd_cnt: Wrapping<u32>,
/// Total number of bytes previously forwarded to stream.
last_fwd_cnt: Wrapping<u32>,
/// Size of buffer the guest has allocated for this connection.
peer_buf_alloc: u32,
/// Number of bytes the peer has forwarded to a connection.
peer_fwd_cnt: Wrapping<u32>,
/// The total number of bytes sent to the guest vsock driver.
rx_cnt: Wrapping<u32>,
/// epoll fd to which this connection's stream has to be added.
pub epoll_fd: RawFd,
/// Local tx buffer.
pub tx_buf: LocalTxBuf,
}
impl<S: AsRawFd + Read + Write> VsockConnection<S> {
/// Create a new vsock connection object for locally i.e host-side
/// inititated connections.
pub fn new_local_init(
stream: S,
local_cid: u64,
local_port: u32,
guest_cid: u64,
guest_port: u32,
epoll_fd: RawFd,
) -> Self {
Self {
stream,
connect: false,
peer_port: guest_port,
rx_queue: RxQueue::new(),
local_cid,
local_port,
guest_cid,
fwd_cnt: Wrapping(0),
last_fwd_cnt: Wrapping(0),
peer_buf_alloc: 0,
peer_fwd_cnt: Wrapping(0),
rx_cnt: Wrapping(0),
epoll_fd,
tx_buf: LocalTxBuf::new(),
}
}
/// Create a new vsock connection object for connections initiated by
/// an application running in the guest.
pub fn new_peer_init(
stream: S,
local_cid: u64,
local_port: u32,
guest_cid: u64,
guest_port: u32,
epoll_fd: RawFd,
peer_buf_alloc: u32,
) -> Self {
let mut rx_queue = RxQueue::new();
rx_queue.enqueue(RxOps::Response);
Self {
stream,
connect: false,
peer_port: guest_port,
rx_queue,
local_cid,
local_port,
guest_cid,
fwd_cnt: Wrapping(0),
last_fwd_cnt: Wrapping(0),
peer_buf_alloc,
peer_fwd_cnt: Wrapping(0),
rx_cnt: Wrapping(0),
epoll_fd,
tx_buf: LocalTxBuf::new(),
}
}
/// Set the peer port to the guest side application's port.
pub fn set_peer_port(&mut self, peer_port: u32) {
self.peer_port = peer_port;
}
/// Process a vsock packet that is meant for this connection.
/// Forward data to the host-side application if the vsock packet
/// contains a RW operation.
pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
// Initialize all fields in the packet header
self.init_pkt(pkt);
match self.rx_queue.dequeue() {
Some(RxOps::Request) => {
// Send a connection request to the guest-side application
pkt.set_op(VSOCK_OP_REQUEST);
Ok(())
}
Some(RxOps::Rw) => {
if !self.connect {
// There is no host-side application listening for this
// packet, hence send back an RST.
pkt.set_op(VSOCK_OP_RST);
return Ok(());
}
// Check if peer has space for receiving data
if self.need_credit_update_from_peer() {
self.last_fwd_cnt = self.fwd_cnt;
pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
return Ok(());
}
let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?;
// Perform a credit check to find the maximum read size. The read
// data must fit inside a packet buffer and be within peer's
// available buffer space
let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit());
// Read data from the stream directly into the buffer
if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) {
if read_cnt == 0 {
// If no data was read then the stream was closed down unexpectedly.
// Send a shutdown packet to the guest-side application.
pkt.set_op(VSOCK_OP_SHUTDOWN)
.set_flag(VSOCK_FLAGS_SHUTDOWN_RCV)
.set_flag(VSOCK_FLAGS_SHUTDOWN_SEND);
} else {
// If data was read, then set the length field in the packet header
// to the amount of data that was read.
pkt.set_op(VSOCK_OP_RW).set_len(read_cnt as u32);
// Re-register the stream file descriptor for read and write events
VhostUserVsockThread::epoll_register(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)?;
}
// Update the rx_cnt with the amount of data in the vsock packet.
self.rx_cnt += Wrapping(pkt.len());
self.last_fwd_cnt = self.fwd_cnt;
}
Ok(())
}
Some(RxOps::Response) => {
// A response has been received to a newly initiated host-side connection
self.connect = true;
pkt.set_op(VSOCK_OP_RESPONSE);
Ok(())
}
Some(RxOps::CreditUpdate) => {
// Request credit update from the guest.
if !self.rx_queue.pending_rx() {
// Waste an rx buffer if no rx is pending
pkt.set_op(VSOCK_OP_CREDIT_UPDATE);
self.last_fwd_cnt = self.fwd_cnt;
}
Ok(())
}
_ => Err(Error::NoRequestRx),
}
}
/// Deliver a guest generated packet to this connection.
///
/// Returns:
/// - always `Ok(())` to indicate that the packet has been consumed
pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
// Update peer credit information
self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
match pkt.op() {
VSOCK_OP_RESPONSE => {
// Confirmation for a host initiated connection
// TODO: Handle stream write error in a better manner
let response = format!("OK {}\n", self.peer_port);
self.stream.write_all(response.as_bytes()).unwrap();
self.connect = true;
}
VSOCK_OP_RW => {
// Data has to be written to the host-side stream
match pkt.data_slice() {
None => {
info!(
"Dropping empty packet from guest (lp={}, pp={})",
self.local_port, self.peer_port
);
return Ok(());
}
Some(buf) => {
if let Err(err) = self.send_bytes(buf) {
// TODO: Terminate this connection
dbg!("err:{:?}", err);
return Ok(());
}
}
}
}
VSOCK_OP_CREDIT_UPDATE => {
// Already updated the credit
// Re-register the stream file descriptor for read and write events
if VhostUserVsockThread::epoll_modify(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.is_err()
{
VhostUserVsockThread::epoll_register(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.unwrap();
};
}
VSOCK_OP_CREDIT_REQUEST => {
// Send back this connection's credit information
self.rx_queue.enqueue(RxOps::CreditUpdate);
}
VSOCK_OP_SHUTDOWN => {
// Shutdown this connection
let recv_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_RCV != 0;
let send_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_SEND != 0;
if recv_off && send_off && self.tx_buf.is_empty() {
self.rx_queue.enqueue(RxOps::Reset);
}
}
_ => {}
}
Ok(())
}
/// Write data to the host-side stream.
///
/// Returns:
/// - Ok(cnt) where cnt is the number of bytes written to the stream
/// - Err(Error::UnixWrite) if there was an error writing to the stream
fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> {
if !self.tx_buf.is_empty() {
// Data is already present in the buffer and the backend
// is waiting for a EPOLLOUT event to flush it
return self.tx_buf.push(buf);
}
// Write data to the stream
let written_count = match buf.write_to(0, &mut self.stream, buf.len()) {
Ok(cnt) => cnt,
Err(vm_memory::VolatileMemoryError::IOError(e)) => {
if e.kind() == ErrorKind::WouldBlock {
0
} else {
dbg!("send_bytes error: {:?}", e);
return Err(Error::UnixWrite);
}
}
Err(e) => {
dbg!("send_bytes error: {:?}", e);
return Err(Error::UnixWrite);
}
};
if written_count > 0 {
// Increment forwarded count by number of bytes written to the stream
self.fwd_cnt += Wrapping(written_count as u32);
// TODO: https://github.com/torvalds/linux/commit/c69e6eafff5f725bc29dcb8b52b6782dca8ea8a2
self.rx_queue.enqueue(RxOps::CreditUpdate);
}
if written_count != buf.len() {
return self.tx_buf.push(&buf.offset(written_count).unwrap());
}
Ok(())
}
/// Initialize all header fields in the vsock packet.
fn init_pkt<'a, 'b, B: BitmapSlice>(
&self,
pkt: &'a mut VsockPacket<'b, B>,
) -> &'a mut VsockPacket<'b, B> {
// Zero out the packet header
pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap();
pkt.set_src_cid(self.local_cid)
.set_dst_cid(self.guest_cid)
.set_src_port(self.local_port)
.set_dst_port(self.peer_port)
.set_type(VSOCK_TYPE_STREAM)
.set_buf_alloc(CONN_TX_BUF_SIZE)
.set_fwd_cnt(self.fwd_cnt.0)
}
/// Get max number of bytes we can send to peer without overflowing
/// the peer's buffer.
fn peer_avail_credit(&self) -> usize {
(Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
}
/// Check if we need a credit update from the peer before sending
/// more data to it.
fn need_credit_update_from_peer(&self) -> bool {
self.peer_avail_credit() == 0
}
}
#[cfg(test)]
mod tests {
use byteorder::{ByteOrder, LittleEndian};
use super::*;
use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM};
use std::io::Result as IoResult;
use std::ops::Deref;
use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE};
use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT};
use vm_memory::{
Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard,
GuestMemoryMmap,
};
struct HeadParams {
head_len: usize,
data_len: u32,
}
impl HeadParams {
fn new(head_len: usize, data_len: u32) -> Self {
Self { head_len, data_len }
}
fn construct_head(&self) -> Vec<u8> {
let mut header = vec![0_u8; self.head_len];
if self.head_len == PKT_HEADER_SIZE {
// Offset into the header for data length
const HDROFF_LEN: usize = 24;
LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len);
}
header
}
}
fn prepare_desc_chain_vsock(
write_only: bool,
head_params: &HeadParams,
data_chain_len: u16,
head_data_len: u32,
) -> (
GuestMemoryAtomic<GuestMemoryMmap>,
DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>,
) {
let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap();
let virt_queue = MockSplitQueue::new(&mem, 16);
let mut next_addr = virt_queue.desc_table().total_size() + 0x100;
let mut flags = 0;
if write_only {
flags |= VRING_DESC_F_WRITE;
}
let mut head_flags = if data_chain_len > 0 {
flags | VRING_DESC_F_NEXT
} else {
flags
};
// vsock packet header
// let header = vec![0 as u8; head_params.head_len];
let header = head_params.construct_head();
let head_desc =
Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1);
mem.write(&header, head_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(0, head_desc).is_ok());
next_addr += head_params.head_len as u64;
// Put the descriptor index 0 in the first available ring position.
mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4))
.unwrap();
// Set `avail_idx` to 1.
mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2))
.unwrap();
// chain len excludes the head
for i in 0..(data_chain_len) {
// last descr in chain
if i == data_chain_len - 1 {
head_flags &= !VRING_DESC_F_NEXT;
}
// vsock data
let data = vec![0_u8; head_data_len as usize];
let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2);
mem.write(&data, data_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok());
next_addr += head_data_len as u64;
}
// Create descriptor chain from pre-filled memory
(
GuestMemoryAtomic::new(mem.clone()),
virt_queue
.create_queue::<Queue>()
.unwrap()
.iter(GuestMemoryAtomic::new(mem.clone()).memory())
.unwrap()
.next()
.unwrap(),
)
}
struct VsockDummySocket {
data: Vec<u8>,
}
impl VsockDummySocket {
fn new() -> Self {
Self { data: Vec::new() }
}
}
impl Write for VsockDummySocket {
fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> {
self.data.clear();
self.data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}
impl Read for VsockDummySocket {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
buf[..self.data.len()].copy_from_slice(&self.data);
Ok(self.data.len())
}
}
impl AsRawFd for VsockDummySocket {
fn as_raw_fd(&self) -> RawFd {
-1
}
}
#[test]
fn test_vsock_conn_init() {
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local =
VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1);
assert!(!conn_local.connect);
assert_eq!(conn_local.peer_port, 5001);
assert_eq!(conn_local.rx_queue, RxQueue::new());
assert_eq!(conn_local.local_cid, VSOCK_HOST_CID);
assert_eq!(conn_local.local_port, 5000);
assert_eq!(conn_local.guest_cid, 3);
// set peer port
conn_local.set_peer_port(5002);
assert_eq!(conn_local.peer_port, 5002);
// New connection initiated by the peer/guest
let dummy_file = VsockDummySocket::new();
let mut conn_peer =
VsockConnection::new_peer_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1, 65536);
assert!(!conn_peer.connect);
assert_eq!(conn_peer.peer_port, 5001);
assert_eq!(conn_peer.rx_queue.dequeue().unwrap(), RxOps::Response);
assert!(!conn_peer.rx_queue.pending_rx());
assert_eq!(conn_peer.local_cid, VSOCK_HOST_CID);
assert_eq!(conn_peer.local_port, 5000);
assert_eq!(conn_peer.guest_cid, 3);
assert_eq!(conn_peer.peer_buf_alloc, 65536);
}
#[test]
fn test_vsock_conn_credit() {
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local =
VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1);
assert_eq!(conn_local.peer_avail_credit(), 0);
assert!(conn_local.need_credit_update_from_peer());
conn_local.peer_buf_alloc = 65536;
assert_eq!(conn_local.peer_avail_credit(), 65536);
assert!(!conn_local.need_credit_update_from_peer());
conn_local.rx_cnt = Wrapping(32768);
assert_eq!(conn_local.peer_avail_credit(), 32768);
assert!(!conn_local.need_credit_update_from_peer());
conn_local.rx_cnt = Wrapping(65536);
assert_eq!(conn_local.peer_avail_credit(), 0);
assert!(conn_local.need_credit_update_from_peer());
}
#[test]
fn test_vsock_conn_init_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 10);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let conn_local =
VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// initialize a vsock packet for the guest
conn_local.init_pkt(&mut pkt);
assert_eq!(pkt.src_cid(), VSOCK_HOST_CID);
assert_eq!(pkt.dst_cid(), 3);
assert_eq!(pkt.src_port(), 5000);
assert_eq!(pkt.dst_port(), 5001);
assert_eq!(pkt.type_(), VSOCK_TYPE_STREAM);
assert_eq!(pkt.buf_alloc(), CONN_TX_BUF_SIZE);
assert_eq!(pkt.fwd_cnt(), 0);
}
#[test]
fn test_vsock_conn_recv_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local =
VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// VSOCK_OP_REQUEST: new local conn request
conn_local.rx_queue.enqueue(RxOps::Request);
let op_req = conn_local.recv_pkt(&mut pkt);
assert!(op_req.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_REQUEST);
// VSOCK_OP_RST: reset if connection not established
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_rst = conn_local.recv_pkt(&mut pkt);
assert!(op_rst.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_RST);
// VSOCK_OP_CREDIT_UPDATE: need credit update from peer/guest
conn_local.connect = true;
conn_local.rx_queue.enqueue(RxOps::Rw);
conn_local.fwd_cnt = Wrapping(1024);
let op_credit_update = conn_local.recv_pkt(&mut pkt);
assert!(op_credit_update.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_CREDIT_REQUEST);
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
// VSOCK_OP_SHUTDOWN: zero data read from stream/file
conn_local.peer_buf_alloc = 65536;
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_zero_read_shutdown = conn_local.recv_pkt(&mut pkt);
assert!(op_zero_read_shutdown.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(conn_local.rx_cnt, Wrapping(0));
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
assert_eq!(pkt.op(), VSOCK_OP_SHUTDOWN);
assert_eq!(
pkt.flags(),
VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND
);
// VSOCK_OP_RW: finite data read from stream/file
conn_local.stream.write_all(b"hello").unwrap();
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_zero_read = conn_local.recv_pkt(&mut pkt);
// below error due to epoll add
assert!(op_zero_read.is_err());
assert_eq!(pkt.op(), VSOCK_OP_RW);
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.len(), 5);
let buf = &mut [0u8; 5];
assert!(pkt.data_slice().unwrap().read_slice(buf, 0).is_ok());
assert_eq!(buf, b"hello");
// VSOCK_OP_RESPONSE: response from a locally initiated connection
conn_local.rx_queue.enqueue(RxOps::Response);
let op_response = conn_local.recv_pkt(&mut pkt);
assert!(op_response.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_RESPONSE);
assert!(conn_local.connect);
// VSOCK_OP_CREDIT_UPDATE: guest needs credit update
conn_local.rx_queue.enqueue(RxOps::CreditUpdate);
let op_credit_update = conn_local.recv_pkt(&mut pkt);
assert!(!conn_local.rx_queue.pending_rx());
assert!(op_credit_update.is_ok());
assert_eq!(pkt.op(), VSOCK_OP_CREDIT_UPDATE);
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
// non-existent request
let op_error = conn_local.recv_pkt(&mut pkt);
assert!(op_error.is_err());
}
#[test]
fn test_vsock_conn_send_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local =
VsockConnection::new_local_init(dummy_file, VSOCK_HOST_CID, 5000, 3, 5001, -1);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// peer credit information
pkt.set_buf_alloc(65536).set_fwd_cnt(1024);
// check if peer credit information is updated currently
let credit_check = conn_local.send_pkt(&pkt);
assert!(credit_check.is_ok());
assert_eq!(conn_local.peer_buf_alloc, 65536);
assert_eq!(conn_local.peer_fwd_cnt, Wrapping(1024));
// VSOCK_OP_RESPONSE
pkt.set_op(VSOCK_OP_RESPONSE);
let peer_response = conn_local.send_pkt(&pkt);
assert!(peer_response.is_ok());
assert!(conn_local.connect);
let mut resp_buf = vec![0; 8];
conn_local.stream.read_exact(&mut resp_buf).unwrap();
assert_eq!(resp_buf, b"OK 5001\n");
// VSOCK_OP_RW
pkt.set_op(VSOCK_OP_RW);
let buf = b"hello";
assert!(pkt.data_slice().unwrap().write_slice(buf, 0).is_ok());
let rw_response = conn_local.send_pkt(&pkt);
assert!(rw_response.is_ok());
let mut resp_buf = vec![0; 5];
conn_local.stream.read_exact(&mut resp_buf).unwrap();
assert_eq!(resp_buf, b"hello");
// VSOCK_OP_CREDIT_REQUEST
pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
let credit_response = conn_local.send_pkt(&pkt);
assert!(credit_response.is_ok());
assert_eq!(conn_local.rx_queue.peek().unwrap(), RxOps::CreditUpdate);
// VSOCK_OP_SHUTDOWN
pkt.set_op(VSOCK_OP_SHUTDOWN);
pkt.set_flags(VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND);
let shutdown_response = conn_local.send_pkt(&pkt);
assert!(shutdown_response.is_ok());
assert!(conn_local.rx_queue.contains(RxOps::Reset.bitmask()));
}
}