diff --git a/staging/vhost-device-sound/src/audio_backends/alsa.rs b/staging/vhost-device-sound/src/audio_backends/alsa.rs index ecbdac9..59b8c6e 100644 --- a/staging/vhost-device-sound/src/audio_backends/alsa.rs +++ b/staging/vhost-device-sound/src/audio_backends/alsa.rs @@ -195,15 +195,14 @@ fn update_pcm( Ok(()) } -// Returns `true` if the function should be called again, because there are are -// more data left to write. +// Returns `Ok(true)` if the function should be called again, because there are +// are more data left to write. fn write_samples_direct( pcm: &alsa::PCM, stream: &mut Stream, mmap: &mut alsa::direct::pcm::MmapPlayback, ) -> AResult { while mmap.avail() > 0 { - // Write samples to DMA area from iterator let Some(buffer) = stream.buffers.front_mut() else { return Ok(false); }; @@ -223,6 +222,7 @@ fn write_samples_direct( } Ok(v) => v, }; + // Write samples to DMA area from iterator let mut iter = buf[0..read_bytes as usize].iter().cloned(); let frames = mmap.write(&mut iter); let written_bytes = pcm.frames_to_bytes(frames); @@ -240,6 +240,8 @@ fn write_samples_direct( } } +// Returns `Ok(true)` if the function should be called again, because there are +// are more data left to write. fn write_samples_io( p: &alsa::PCM, streams: &Arc>>, @@ -385,6 +387,9 @@ impl AlsaBackend { let mut senders = Vec::with_capacity(streams_no); for i in 0..streams_no { let (sender, receiver) = channel(); + + // Initialize with a dummy value, which will be updated every time we call + // `update_pcm`. let pcm = Arc::new(Mutex::new(PCM::new("default", Direction::Playback, false)?)); let mtx = Arc::clone(&pcm); @@ -495,16 +500,18 @@ impl AlsaBackend { msg.code = VIRTIO_SND_S_BAD_MSG; continue; }; - // Stop the worker. + // Stop worker thread senders[stream_id].send(false).unwrap(); let mut streams = streams.write().unwrap(); if let Err(err) = streams[stream_id].state.release() { log::error!("Stream {}: {}", stream_id, err); msg.code = VIRTIO_SND_S_BAD_MSG; } - // Release buffers even if state transition is invalid. If it is invalid, we - // won't be in a valid device state anyway so better to get rid of them and - // free the virt queue. + // Drop pending stream buffers to complete pending I/O messages + // + // This will release buffers even if state transition is invalid. If it is + // invalid, we won't be in a valid device state anyway so better to get rid of + // them and free the virt queue. std::mem::take(&mut streams[stream_id].buffers); } AlsaAction::SetParameters(stream_id, mut msg) => { @@ -545,9 +552,9 @@ impl AlsaBackend { st.params.format = request.format; st.params.rate = request.rate; } + // Manually drop msg for faster response: the kernel has a timeout. + drop(msg); } - // Manually drop msg for faster response: the kernel has a timeout. - drop(msg); update_pcm(&pcms[stream_id], stream_id, &streams)?; } } diff --git a/staging/vhost-device-sound/src/device.rs b/staging/vhost-device-sound/src/device.rs index a7029ed..48040ce 100644 --- a/staging/vhost-device-sound/src/device.rs +++ b/staging/vhost-device-sound/src/device.rs @@ -467,17 +467,23 @@ impl VhostUserSoundThread { return Ok(true); } + // Instead of counting descriptor chain lengths, encode the "parsing" logic in + // an enumeration. Then, the compiler will complain about any unhandled + // match {} cases if any part of the code is changed. This makes invalid + // states unrepresentable in the source code. #[derive(Copy, Clone, PartialEq, Debug)] - enum TxState { + enum IoState { Ready, WaitingBufferForStreamId(u32), Done, } + // Keep log of stream IDs to wake up, in case the guest has queued more than + // one. let mut stream_ids = BTreeSet::default(); for desc_chain in requests { - let mut state = TxState::Ready; + let mut state = IoState::Ready; let mut buffers = vec![]; let descriptors: Vec<_> = desc_chain.clone().collect(); let message = Arc::new(IOMessage { @@ -485,14 +491,17 @@ impl VhostUserSoundThread { status: VIRTIO_SND_S_OK.into(), latency_bytes: 0.into(), desc_chain: desc_chain.clone(), - descriptor: descriptors.last().cloned().unwrap(), + response_descriptor: descriptors.last().cloned().ok_or_else(|| { + log::error!("Received IO request with an empty descriptor chain."); + Error::UnexpectedDescriptorCount(0) + })?, }); for descriptor in &descriptors { match state { - TxState::Done => { + IoState::Done => { return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into()); } - TxState::Ready if descriptor.is_write_only() => { + IoState::Ready if descriptor.is_write_only() => { if descriptor.len() as usize != size_of::() { return Err(Error::UnexpectedDescriptorSize( size_of::(), @@ -500,9 +509,9 @@ impl VhostUserSoundThread { ) .into()); } - state = TxState::Done; + state = IoState::Done; } - TxState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => { + IoState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => { if descriptor.len() as usize != size_of::() { return Err(Error::UnexpectedDescriptorSize( size_of::(), @@ -514,9 +523,9 @@ impl VhostUserSoundThread { for b in std::mem::take(&mut buffers) { streams[stream_id as usize].buffers.push_back(b); } - state = TxState::Done; + state = IoState::Done; } - TxState::Ready + IoState::Ready if descriptor.len() as usize != size_of::() => { return Err(Error::UnexpectedDescriptorSize( @@ -525,7 +534,7 @@ impl VhostUserSoundThread { ) .into()); } - TxState::Ready => { + IoState::Ready => { let xfer = desc_chain .memory() .read_obj::(descriptor.addr()) @@ -533,9 +542,9 @@ impl VhostUserSoundThread { let stream_id: u32 = xfer.stream_id.into(); stream_ids.insert(stream_id); - state = TxState::WaitingBufferForStreamId(stream_id); + state = IoState::WaitingBufferForStreamId(stream_id); } - TxState::WaitingBufferForStreamId(stream_id) + IoState::WaitingBufferForStreamId(stream_id) if descriptor.len() as usize == size_of::() => { return Err(Error::UnexpectedDescriptorSize( @@ -548,16 +557,16 @@ impl VhostUserSoundThread { ) .into()); } - TxState::WaitingBufferForStreamId(_stream_id) => { - /* - Rather than copying the content of a descriptor, buffer keeps a pointer to it. - When we copy just after the request is enqueued, the guest's userspace may or - may not have updated the buffer contents. Guest driver simply moves buffers - from the used ring to the available ring without knowing whether the content - has been updated. The device only reads the buffer from guest memory when the - audio engine requires it, which is about after a period thus ensuring that the - buffer is up-to-date. - */ + IoState::WaitingBufferForStreamId(_stream_id) => { + // In the case of TX/Playback: + // + // Rather than copying the content of a descriptor, buffer keeps a pointer + // to it. When we copy just after the request is enqueued, the guest's + // userspace may or may not have updated the buffer contents. Guest driver + // simply moves buffers from the used ring to the available ring without + // knowing whether the content has been updated. The device only reads the + // buffer from guest memory when the audio engine requires it, which is + // about after a period thus ensuring that the buffer is up-to-date. buffers.push(Buffer::new(*descriptor, Arc::clone(&message))); } } @@ -626,7 +635,7 @@ impl VhostUserSoundBackend { }, ]; let chmaps: Arc>> = Arc::new(RwLock::new(chmaps_info)); - log::trace!("VhostUserSoundBackend::new config {:?}", &config); + log::trace!("VhostUserSoundBackend::new(config = {:?})", &config); let threads = if config.multi_thread { vec![ RwLock::new(VhostUserSoundThread::new( @@ -691,7 +700,10 @@ impl VhostUserBackend for VhostUserSoundBackend { } fn max_queue_size(&self) -> usize { - // TODO: Investigate if an alternative value makes any difference. + // The linux kernel driver does no checks for queue length and fails silently if + // a queue is filled up. In this case, adding an element to the queue + // returns ENOSPC and the element is not queued for a later attempt and + // is lost. `64` is a "good enough" value from our observations. 64 } diff --git a/staging/vhost-device-sound/src/lib.rs b/staging/vhost-device-sound/src/lib.rs index 4e5e1b1..e0649a5 100644 --- a/staging/vhost-device-sound/src/lib.rs +++ b/staging/vhost-device-sound/src/lib.rs @@ -250,13 +250,12 @@ pub struct IOMessage { status: std::sync::atomic::AtomicU32, pub latency_bytes: std::sync::atomic::AtomicU32, desc_chain: SoundDescriptorChain, - descriptor: virtio_queue::Descriptor, + response_descriptor: virtio_queue::Descriptor, vring: VringRwLock, } impl Drop for IOMessage { fn drop(&mut self) { - log::trace!("dropping IOMessage"); let resp = VirtioSoundPcmStatus { status: self.status.load(std::sync::atomic::Ordering::SeqCst).into(), latency_bytes: self @@ -264,24 +263,24 @@ impl Drop for IOMessage { .load(std::sync::atomic::Ordering::SeqCst) .into(), }; + log::trace!("dropping IOMessage {:?}", resp); if let Err(err) = self .desc_chain .memory() - .write_obj(resp, self.descriptor.addr()) + .write_obj(resp, self.response_descriptor.addr()) { log::error!("Error::DescriptorWriteFailed: {}", err); return; } - if self + if let Err(err) = self .vring .add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32) - .is_err() { - log::error!("Couldn't add used"); + log::error!("Couldn't add used bytes count to vring: {}", err); } - if self.vring.signal_used_queue().is_err() { - log::error!("Couldn't signal used queue"); + if let Err(err) = self.vring.signal_used_queue() { + log::error!("Couldn't signal used queue: {}", err); } } } @@ -289,7 +288,7 @@ impl Drop for IOMessage { /// This is the public API through which an external program starts the /// vhost-device-sound backend server. pub fn start_backend_server(config: SoundConfig) { - log::trace!("Using config {:?}", &config); + log::trace!("Using config {:?}.", &config); let listener = Listener::new(config.get_socket_path(), true).unwrap(); let backend = Arc::new(VhostUserSoundBackend::new(config).unwrap()); @@ -300,12 +299,12 @@ pub fn start_backend_server(config: SoundConfig) { ) .unwrap(); - log::trace!("Starting daemon"); + log::trace!("Starting daemon."); daemon.start(listener).unwrap(); match daemon.wait() { Ok(()) => { - info!("Stopping cleanly"); + info!("Stopping cleanly."); } Err(vhost_user_backend::Error::HandleRequest(vhost_user::Error::PartialMessage)) => { info!(