From b7122e66af5ae3cc9ea4fa5369cc1abe54969b17 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 4 Jul 2023 17:52:13 +0300 Subject: [PATCH] sound: Add TX queue handler Signed-off-by: Emmanouil Pitsidianakis --- crates/sound/src/audio_backends/null.rs | 1 + crates/sound/src/audio_backends/pipewire.rs | 1 + crates/sound/src/device.rs | 194 +++++++++++++++----- crates/sound/src/lib.rs | 42 ++++- crates/sound/src/stream.rs | 38 +++- 5 files changed, 223 insertions(+), 53 deletions(-) diff --git a/crates/sound/src/audio_backends/null.rs b/crates/sound/src/audio_backends/null.rs index f925cc7..66edb5e 100644 --- a/crates/sound/src/audio_backends/null.rs +++ b/crates/sound/src/audio_backends/null.rs @@ -18,6 +18,7 @@ impl NullBackend { impl AudioBackend for NullBackend { fn write(&self, stream_id: u32) -> Result<()> { log::trace!("NullBackend write stream_id {}", stream_id); + _ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers); Ok(()) } diff --git a/crates/sound/src/audio_backends/pipewire.rs b/crates/sound/src/audio_backends/pipewire.rs index e98c1c7..c051c08 100644 --- a/crates/sound/src/audio_backends/pipewire.rs +++ b/crates/sound/src/audio_backends/pipewire.rs @@ -19,6 +19,7 @@ impl PwBackend { impl AudioBackend for PwBackend { fn write(&self, stream_id: u32) -> Result<()> { log::trace!("PipewireBackend write stream_id {}", stream_id); + _ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers); Ok(()) } diff --git a/crates/sound/src/device.rs b/crates/sound/src/device.rs index 82a3d5e..c2f40c2 100644 --- a/crates/sound/src/device.rs +++ b/crates/sound/src/device.rs @@ -3,8 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause use std::{ + collections::BTreeSet, convert::TryFrom, io::Result as IoResult, + mem::size_of, sync::{Arc, RwLock}, }; @@ -25,9 +27,9 @@ use vmm_sys_util::{ use crate::{ audio_backends::{alloc_audio_backend, AudioBackend}, - stream::{Error as StreamError, Stream}, + stream::{Buffer, Error as StreamError, Stream}, virtio_sound::{self, *}, - ControlMessageKind, Error, Result, SoundConfig, + ControlMessageKind, Error, IOMessage, Result, SoundConfig, }; struct VhostUserSoundThread { @@ -170,10 +172,9 @@ impl VhostUserSoundThread { let code = ControlMessageKind::try_from(request.code).map_err(Error::from)?; match code { - ControlMessageKind::JackInfo => { - resp.code = VIRTIO_SND_S_NOT_SUPP.into(); - } - ControlMessageKind::JackRemap => { + ControlMessageKind::ChmapInfo + | ControlMessageKind::JackInfo + | ControlMessageKind::JackRemap => { resp.code = VIRTIO_SND_S_NOT_SUPP.into(); } ControlMessageKind::PcmInfo => { @@ -237,18 +238,20 @@ impl VhostUserSoundThread { log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id))); resp.code = VIRTIO_SND_S_BAD_MSG.into(); } else { - let b = audio_backend.read().unwrap(); - b.set_parameters( - stream_id, - ControlMessage { - kind: code, - code: VIRTIO_SND_S_OK, - desc_chain, - descriptor: desc_hdr, - vring: vring.clone(), - }, - ) - .unwrap(); + audio_backend + .read() + .unwrap() + .set_parameters( + stream_id, + ControlMessage { + kind: code, + code: VIRTIO_SND_S_OK, + desc_chain, + descriptor: desc_hdr, + vring: vring.clone(), + }, + ) + .unwrap(); // PcmSetParams needs check valid formats/rates; the audio backend will // reply when it drops the ControlMessage. @@ -266,8 +269,7 @@ impl VhostUserSoundThread { log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id))); resp.code = VIRTIO_SND_S_BAD_MSG.into(); } else { - let b = audio_backend.write().unwrap(); - b.prepare(stream_id).unwrap(); + audio_backend.write().unwrap().prepare(stream_id).unwrap(); } } ControlMessageKind::PcmRelease => { @@ -281,18 +283,20 @@ impl VhostUserSoundThread { log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id))); resp.code = VIRTIO_SND_S_BAD_MSG.into(); } else { - let b = audio_backend.write().unwrap(); - b.release( - stream_id, - ControlMessage { - kind: code, - code: VIRTIO_SND_S_OK, - desc_chain, - descriptor: desc_hdr, - vring: vring.clone(), - }, - ) - .unwrap(); + audio_backend + .write() + .unwrap() + .release( + stream_id, + ControlMessage { + kind: code, + code: VIRTIO_SND_S_OK, + desc_chain, + descriptor: desc_hdr, + vring: vring.clone(), + }, + ) + .unwrap(); // PcmRelease needs to flush IO messages; the audio backend will reply when // it drops the ControlMessage. @@ -310,8 +314,7 @@ impl VhostUserSoundThread { log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id))); resp.code = VIRTIO_SND_S_BAD_MSG.into(); } else { - let b = audio_backend.write().unwrap(); - b.start(stream_id).unwrap(); + audio_backend.write().unwrap().start(stream_id).unwrap(); } } ControlMessageKind::PcmStop => { @@ -325,13 +328,9 @@ impl VhostUserSoundThread { log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id))); resp.code = VIRTIO_SND_S_BAD_MSG.into(); } else { - let b = audio_backend.write().unwrap(); - b.stop(stream_id).unwrap(); + audio_backend.write().unwrap().stop(stream_id).unwrap(); } } - ControlMessageKind::ChmapInfo => { - resp.code = VIRTIO_SND_S_NOT_SUPP.into(); - } } log::trace!( "returned {} for ctrl msg {:?}", @@ -375,10 +374,121 @@ impl VhostUserSoundThread { fn process_tx( &self, - _vring: &VringRwLock, - _audio_backend: &RwLock>, + vring: &VringRwLock, + audio_backend: &RwLock>, ) -> IoResult { - log::trace!("process_tx"); + let requests: Vec = vring + .get_mut() + .get_queue_mut() + .iter(self.mem.as_ref().unwrap().memory()) + .map_err(|_| Error::DescriptorNotFound)? + .collect(); + + if requests.is_empty() { + return Ok(true); + } + + #[derive(Copy, Clone, PartialEq, Debug)] + enum TxState { + Ready, + WaitingBufferForStreamId(u32), + Done, + } + + let mut stream_ids = BTreeSet::default(); + + for desc_chain in requests { + let mut state = TxState::Ready; + let mut buffers = vec![]; + let descriptors: Vec<_> = desc_chain.clone().collect(); + let message = Arc::new(IOMessage { + vring: vring.clone(), + status: VIRTIO_SND_S_OK.into(), + desc_chain: desc_chain.clone(), + descriptor: descriptors.last().cloned().unwrap(), + }); + for descriptor in &descriptors { + match state { + TxState::Done => { + return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into()); + } + TxState::Ready if descriptor.is_write_only() => { + if descriptor.len() as usize != size_of::() { + return Err(Error::UnexpectedDescriptorSize( + size_of::(), + descriptor.len(), + ) + .into()); + } + state = TxState::Done; + } + TxState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => { + if descriptor.len() as usize != size_of::() { + return Err(Error::UnexpectedDescriptorSize( + size_of::(), + descriptor.len(), + ) + .into()); + } + let mut streams = self.streams.write().unwrap(); + for b in std::mem::take(&mut buffers) { + streams[stream_id as usize].buffers.push_back(b); + } + state = TxState::Done; + } + TxState::Ready + if descriptor.len() as usize != size_of::() => + { + return Err(Error::UnexpectedDescriptorSize( + size_of::(), + descriptor.len(), + ) + .into()); + } + TxState::Ready => { + let xfer = desc_chain + .memory() + .read_obj::(descriptor.addr()) + .map_err(|_| Error::DescriptorReadFailed)?; + let stream_id: u32 = xfer.stream_id.into(); + stream_ids.insert(stream_id); + + state = TxState::WaitingBufferForStreamId(stream_id); + } + TxState::WaitingBufferForStreamId(stream_id) + if descriptor.len() as usize == size_of::() => + { + return Err(Error::UnexpectedDescriptorSize( + u32::from( + self.streams.read().unwrap()[stream_id as usize] + .params + .buffer_bytes, + ) as usize, + descriptor.len(), + ) + .into()); + } + TxState::WaitingBufferForStreamId(_stream_id) => { + let mut buf = vec![0; descriptor.len() as usize]; + let bytes_read = desc_chain + .memory() + .read(&mut buf, descriptor.addr()) + .map_err(|_| Error::DescriptorReadFailed)?; + buf.truncate(bytes_read); + + buffers.push(Buffer::new(buf, Arc::clone(&message))); + } + } + } + } + + if !stream_ids.is_empty() { + let b = audio_backend.write().unwrap(); + for id in stream_ids { + b.write(id).unwrap(); + } + } + Ok(false) } diff --git a/crates/sound/src/lib.rs b/crates/sound/src/lib.rs index 82cbc86..474add1 100644 --- a/crates/sound/src/lib.rs +++ b/crates/sound/src/lib.rs @@ -20,7 +20,6 @@ use vhost_user_backend::{VhostUserDaemon, VringRwLock, VringT}; use virtio_sound::*; use vm_memory::{ ByteValued, Bytes, GuestMemoryAtomic, GuestMemoryLoadGuard, GuestMemoryMmap, Le32, - VolatileSlice, }; use crate::device::VhostUserSoundBackend; @@ -226,16 +225,39 @@ impl SoundConfig { } } -pub type SoundBitmap = (); - -#[derive(Debug)] -pub struct SoundRequest<'a> { - data_slice: Option>, +pub struct IOMessage { + status: std::sync::atomic::AtomicU32, + desc_chain: SoundDescriptorChain, + descriptor: virtio_queue::Descriptor, + vring: VringRwLock, } -impl<'a> SoundRequest<'a> { - pub fn data_slice(&self) -> Option<&VolatileSlice<'a, SoundBitmap>> { - self.data_slice.as_ref() +impl Drop for IOMessage { + fn drop(&mut self) { + log::trace!("dropping IOMessage"); + let resp = VirtioSoundPcmStatus { + status: self.status.load(std::sync::atomic::Ordering::SeqCst).into(), + latency_bytes: 0.into(), + }; + + if let Err(err) = self + .desc_chain + .memory() + .write_obj(resp, self.descriptor.addr()) + { + log::error!("Error::DescriptorWriteFailed: {}", err); + return; + } + if self + .vring + .add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32) + .is_err() + { + log::error!("Couldn't add used"); + } + if self.vring.signal_used_queue().is_err() { + log::error!("Couldn't signal used queue"); + } } } @@ -249,7 +271,7 @@ pub fn start_backend_server(config: SoundConfig) { let mut daemon = VhostUserDaemon::new( String::from("vhost-user-sound"), backend.clone(), - GuestMemoryAtomic::new(GuestMemoryMmap::::new()), + GuestMemoryAtomic::new(GuestMemoryMmap::new()), ) .unwrap(); diff --git a/crates/sound/src/stream.rs b/crates/sound/src/stream.rs index ba7b796..35d5e3f 100644 --- a/crates/sound/src/stream.rs +++ b/crates/sound/src/stream.rs @@ -1,10 +1,12 @@ // Manos Pitsidianakis // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause +use std::{collections::VecDeque, sync::Arc}; + use thiserror::Error as ThisError; use vm_memory::{Le32, Le64}; -use crate::{virtio_sound::*, SUPPORTED_FORMATS, SUPPORTED_RATES}; +use crate::{virtio_sound::*, IOMessage, SUPPORTED_FORMATS, SUPPORTED_RATES}; /// Stream errors. #[derive(Debug, ThisError)] @@ -172,6 +174,7 @@ pub struct Stream { pub channels_min: u8, pub channels_max: u8, pub state: PCMState, + pub buffers: VecDeque, } impl Default for Stream { @@ -185,6 +188,7 @@ impl Default for Stream { channels_min: 1, channels_max: 6, state: Default::default(), + buffers: VecDeque::new(), } } } @@ -228,3 +232,35 @@ impl Default for PcmParams { } } } + +pub struct Buffer { + pub bytes: Vec, + pub pos: usize, + pub message: Arc, +} + +impl std::fmt::Debug for Buffer { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct(stringify!(Buffer)) + .field("bytes", &self.bytes.len()) + .field("pos", &self.pos) + .field("message", &Arc::as_ptr(&self.message)) + .finish() + } +} + +impl Buffer { + pub fn new(bytes: Vec, message: Arc) -> Self { + Self { + bytes, + pos: 0, + message, + } + } +} + +impl Drop for Buffer { + fn drop(&mut self) { + log::trace!("dropping buffer {:?}", self); + } +}