mirror of
https://github.com/rust-vmm/vhost-device.git
synced 2026-01-04 08:17:15 +00:00
[vsock] refactor VhostUserVsockThread worker
For now, VhostUserVsockThread uses thread pool executor from futures, but it doesn't need to use thread pool executor and futures because we just need background worker thread, and a way to let it work. So I removed unnecessary external dependency and made the logic simpler by using just thread and channel Signed-off-by: Jeongik Cha <jeongik@google.com>
This commit is contained in:
parent
ed5b597c70
commit
7b2632b509
128
Cargo.lock
generated
128
Cargo.lock
generated
@ -93,12 +93,6 @@ dependencies = [
|
||||
"syn 2.0.29",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.13.1"
|
||||
@ -362,96 +356,6 @@ version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-core"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
"num_cpus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.29",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
|
||||
|
||||
[[package]]
|
||||
name = "futures-task"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.14.7"
|
||||
@ -665,16 +569,6 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.7.0"
|
||||
@ -769,18 +663,6 @@ dependencies = [
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pkg-config"
|
||||
version = "0.3.27"
|
||||
@ -996,15 +878,6 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380"
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.10.0"
|
||||
@ -1299,7 +1172,6 @@ dependencies = [
|
||||
"config",
|
||||
"env_logger",
|
||||
"epoll",
|
||||
"futures",
|
||||
"log",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
|
||||
@ -17,7 +17,6 @@ byteorder = "1"
|
||||
clap = { version = "4.4", features = ["derive"] }
|
||||
env_logger = "0.10"
|
||||
epoll = "4.3.2"
|
||||
futures = { version = "0.3", features = ["thread-pool"] }
|
||||
log = "0.4"
|
||||
thiserror = "1.0"
|
||||
vhost = { version = "0.8", features = ["vhost-user-slave"] }
|
||||
|
||||
@ -115,8 +115,6 @@ pub(crate) enum Error {
|
||||
IterateQueue,
|
||||
#[error("No rx request available")]
|
||||
NoRequestRx,
|
||||
#[error("Unable to create thread pool")]
|
||||
CreateThreadPool(std::io::Error),
|
||||
#[error("Packet missing data buffer")]
|
||||
PktBufMissing,
|
||||
#[error("Failed to connect to unix socket")]
|
||||
|
||||
@ -12,10 +12,11 @@ use std::{
|
||||
net::{UnixListener, UnixStream},
|
||||
prelude::{AsRawFd, FromRawFd, RawFd},
|
||||
},
|
||||
sync::{Arc, RwLock},
|
||||
sync::mpsc::Sender,
|
||||
sync::{mpsc, Arc, RwLock},
|
||||
thread,
|
||||
};
|
||||
|
||||
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
||||
use log::warn;
|
||||
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
|
||||
use virtio_queue::QueueOwnedT;
|
||||
@ -42,6 +43,15 @@ enum RxQueueType {
|
||||
Standard,
|
||||
RawPkts,
|
||||
}
|
||||
|
||||
// Data which is required by a worker handling event idx.
|
||||
struct EventData {
|
||||
vring: VringRwLock,
|
||||
event_idx: bool,
|
||||
head_idx: u16,
|
||||
used_len: usize,
|
||||
}
|
||||
|
||||
pub(crate) struct VhostUserVsockThread {
|
||||
/// Guest memory map.
|
||||
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
|
||||
@ -61,8 +71,8 @@ pub(crate) struct VhostUserVsockThread {
|
||||
pub thread_backend: VsockThreadBackend,
|
||||
/// CID of the guest.
|
||||
guest_cid: u64,
|
||||
/// Thread pool to handle event idx.
|
||||
pool: ThreadPool,
|
||||
/// Channel to a worker which handles event idx.
|
||||
sender: Sender<EventData>,
|
||||
/// host side port on which application listens.
|
||||
local_port: Wrapping<u32>,
|
||||
/// The tx buffer size
|
||||
@ -126,7 +136,15 @@ impl VhostUserVsockThread {
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
let (sender, receiver) = mpsc::channel::<EventData>();
|
||||
thread::spawn(move || loop {
|
||||
// TODO: Understand why doing the following in the background thread works.
|
||||
// maybe we'd better have thread pool for the entire application if necessary.
|
||||
let Ok(event_data) = receiver.recv() else {
|
||||
break;
|
||||
};
|
||||
Self::vring_handle_event(event_data);
|
||||
});
|
||||
let thread = VhostUserVsockThread {
|
||||
mem: None,
|
||||
event_idx: false,
|
||||
@ -137,10 +155,7 @@ impl VhostUserVsockThread {
|
||||
epoll_file,
|
||||
thread_backend,
|
||||
guest_cid,
|
||||
pool: ThreadPoolBuilder::new()
|
||||
.pool_size(1)
|
||||
.create()
|
||||
.map_err(Error::CreateThreadPool)?,
|
||||
sender,
|
||||
local_port: Wrapping(0),
|
||||
tx_buffer_size,
|
||||
sibling_event_fd,
|
||||
@ -152,6 +167,37 @@ impl VhostUserVsockThread {
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
fn vring_handle_event(event_data: EventData) {
|
||||
if event_data.event_idx {
|
||||
if event_data
|
||||
.vring
|
||||
.add_used(event_data.head_idx, event_data.used_len as u32)
|
||||
.is_err()
|
||||
{
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
match event_data.vring.needs_notification() {
|
||||
Err(_) => {
|
||||
warn!("Could not check if queue needs to be notified");
|
||||
event_data.vring.signal_used_queue().unwrap();
|
||||
}
|
||||
Ok(needs_notification) => {
|
||||
if needs_notification {
|
||||
event_data.vring.signal_used_queue().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if event_data
|
||||
.vring
|
||||
.add_used(event_data.head_idx, event_data.used_len as u32)
|
||||
.is_err()
|
||||
{
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
event_data.vring.signal_used_queue().unwrap();
|
||||
}
|
||||
}
|
||||
/// Register a file with an epoll to listen for events in evset.
|
||||
pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
|
||||
epoll::ctl(
|
||||
@ -504,31 +550,14 @@ impl VhostUserVsockThread {
|
||||
|
||||
let vring = vring.clone();
|
||||
let event_idx = self.event_idx;
|
||||
|
||||
self.pool.spawn_ok(async move {
|
||||
// TODO: Understand why doing the following in the pool works
|
||||
if event_idx {
|
||||
if vring.add_used(head_idx, used_len as u32).is_err() {
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
match vring.needs_notification() {
|
||||
Err(_) => {
|
||||
warn!("Could not check if queue needs to be notified");
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
Ok(needs_notification) => {
|
||||
if needs_notification {
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if vring.add_used(head_idx, used_len as u32).is_err() {
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
});
|
||||
self.sender
|
||||
.send(EventData {
|
||||
vring,
|
||||
event_idx,
|
||||
head_idx,
|
||||
used_len,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
match rx_queue_type {
|
||||
RxQueueType::Standard => {
|
||||
@ -661,30 +690,14 @@ impl VhostUserVsockThread {
|
||||
|
||||
let vring = vring.clone();
|
||||
let event_idx = self.event_idx;
|
||||
|
||||
self.pool.spawn_ok(async move {
|
||||
if event_idx {
|
||||
if vring.add_used(head_idx, used_len as u32).is_err() {
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
match vring.needs_notification() {
|
||||
Err(_) => {
|
||||
warn!("Could not check if queue needs to be notified");
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
Ok(needs_notification) => {
|
||||
if needs_notification {
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if vring.add_used(head_idx, used_len as u32).is_err() {
|
||||
warn!("Could not return used descriptors to ring");
|
||||
}
|
||||
vring.signal_used_queue().unwrap();
|
||||
}
|
||||
});
|
||||
self.sender
|
||||
.send(EventData {
|
||||
vring,
|
||||
event_idx,
|
||||
head_idx,
|
||||
used_len,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Ok(used_any)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user