sound: Add TX queue handler

Signed-off-by: Emmanouil Pitsidianakis <manos.pitsidianakis@linaro.org>
This commit is contained in:
Manos Pitsidianakis 2023-07-04 17:52:13 +03:00
parent ecf88fb602
commit b7122e66af
No known key found for this signature in database
GPG Key ID: 7729C7707F7E09D0
5 changed files with 223 additions and 53 deletions

View File

@ -18,6 +18,7 @@ impl NullBackend {
impl AudioBackend for NullBackend {
fn write(&self, stream_id: u32) -> Result<()> {
log::trace!("NullBackend write stream_id {}", stream_id);
_ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers);
Ok(())
}

View File

@ -19,6 +19,7 @@ impl PwBackend {
impl AudioBackend for PwBackend {
fn write(&self, stream_id: u32) -> Result<()> {
log::trace!("PipewireBackend write stream_id {}", stream_id);
_ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers);
Ok(())
}

View File

@ -3,8 +3,10 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
collections::BTreeSet,
convert::TryFrom,
io::Result as IoResult,
mem::size_of,
sync::{Arc, RwLock},
};
@ -25,9 +27,9 @@ use vmm_sys_util::{
use crate::{
audio_backends::{alloc_audio_backend, AudioBackend},
stream::{Error as StreamError, Stream},
stream::{Buffer, Error as StreamError, Stream},
virtio_sound::{self, *},
ControlMessageKind, Error, Result, SoundConfig,
ControlMessageKind, Error, IOMessage, Result, SoundConfig,
};
struct VhostUserSoundThread {
@ -170,10 +172,9 @@ impl VhostUserSoundThread {
let code = ControlMessageKind::try_from(request.code).map_err(Error::from)?;
match code {
ControlMessageKind::JackInfo => {
resp.code = VIRTIO_SND_S_NOT_SUPP.into();
}
ControlMessageKind::JackRemap => {
ControlMessageKind::ChmapInfo
| ControlMessageKind::JackInfo
| ControlMessageKind::JackRemap => {
resp.code = VIRTIO_SND_S_NOT_SUPP.into();
}
ControlMessageKind::PcmInfo => {
@ -237,18 +238,20 @@ impl VhostUserSoundThread {
log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id)));
resp.code = VIRTIO_SND_S_BAD_MSG.into();
} else {
let b = audio_backend.read().unwrap();
b.set_parameters(
stream_id,
ControlMessage {
kind: code,
code: VIRTIO_SND_S_OK,
desc_chain,
descriptor: desc_hdr,
vring: vring.clone(),
},
)
.unwrap();
audio_backend
.read()
.unwrap()
.set_parameters(
stream_id,
ControlMessage {
kind: code,
code: VIRTIO_SND_S_OK,
desc_chain,
descriptor: desc_hdr,
vring: vring.clone(),
},
)
.unwrap();
// PcmSetParams needs check valid formats/rates; the audio backend will
// reply when it drops the ControlMessage.
@ -266,8 +269,7 @@ impl VhostUserSoundThread {
log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id)));
resp.code = VIRTIO_SND_S_BAD_MSG.into();
} else {
let b = audio_backend.write().unwrap();
b.prepare(stream_id).unwrap();
audio_backend.write().unwrap().prepare(stream_id).unwrap();
}
}
ControlMessageKind::PcmRelease => {
@ -281,18 +283,20 @@ impl VhostUserSoundThread {
log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id)));
resp.code = VIRTIO_SND_S_BAD_MSG.into();
} else {
let b = audio_backend.write().unwrap();
b.release(
stream_id,
ControlMessage {
kind: code,
code: VIRTIO_SND_S_OK,
desc_chain,
descriptor: desc_hdr,
vring: vring.clone(),
},
)
.unwrap();
audio_backend
.write()
.unwrap()
.release(
stream_id,
ControlMessage {
kind: code,
code: VIRTIO_SND_S_OK,
desc_chain,
descriptor: desc_hdr,
vring: vring.clone(),
},
)
.unwrap();
// PcmRelease needs to flush IO messages; the audio backend will reply when
// it drops the ControlMessage.
@ -310,8 +314,7 @@ impl VhostUserSoundThread {
log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id)));
resp.code = VIRTIO_SND_S_BAD_MSG.into();
} else {
let b = audio_backend.write().unwrap();
b.start(stream_id).unwrap();
audio_backend.write().unwrap().start(stream_id).unwrap();
}
}
ControlMessageKind::PcmStop => {
@ -325,13 +328,9 @@ impl VhostUserSoundThread {
log::error!("{}", Error::from(StreamError::InvalidStreamId(stream_id)));
resp.code = VIRTIO_SND_S_BAD_MSG.into();
} else {
let b = audio_backend.write().unwrap();
b.stop(stream_id).unwrap();
audio_backend.write().unwrap().stop(stream_id).unwrap();
}
}
ControlMessageKind::ChmapInfo => {
resp.code = VIRTIO_SND_S_NOT_SUPP.into();
}
}
log::trace!(
"returned {} for ctrl msg {:?}",
@ -375,10 +374,121 @@ impl VhostUserSoundThread {
fn process_tx(
&self,
_vring: &VringRwLock,
_audio_backend: &RwLock<Box<dyn AudioBackend + Send + Sync>>,
vring: &VringRwLock,
audio_backend: &RwLock<Box<dyn AudioBackend + Send + Sync>>,
) -> IoResult<bool> {
log::trace!("process_tx");
let requests: Vec<SoundDescriptorChain> = vring
.get_mut()
.get_queue_mut()
.iter(self.mem.as_ref().unwrap().memory())
.map_err(|_| Error::DescriptorNotFound)?
.collect();
if requests.is_empty() {
return Ok(true);
}
#[derive(Copy, Clone, PartialEq, Debug)]
enum TxState {
Ready,
WaitingBufferForStreamId(u32),
Done,
}
let mut stream_ids = BTreeSet::default();
for desc_chain in requests {
let mut state = TxState::Ready;
let mut buffers = vec![];
let descriptors: Vec<_> = desc_chain.clone().collect();
let message = Arc::new(IOMessage {
vring: vring.clone(),
status: VIRTIO_SND_S_OK.into(),
desc_chain: desc_chain.clone(),
descriptor: descriptors.last().cloned().unwrap(),
});
for descriptor in &descriptors {
match state {
TxState::Done => {
return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into());
}
TxState::Ready if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
descriptor.len(),
)
.into());
}
state = TxState::Done;
}
TxState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
descriptor.len(),
)
.into());
}
let mut streams = self.streams.write().unwrap();
for b in std::mem::take(&mut buffers) {
streams[stream_id as usize].buffers.push_back(b);
}
state = TxState::Done;
}
TxState::Ready
if descriptor.len() as usize != size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmXfer>(),
descriptor.len(),
)
.into());
}
TxState::Ready => {
let xfer = desc_chain
.memory()
.read_obj::<VirtioSoundPcmXfer>(descriptor.addr())
.map_err(|_| Error::DescriptorReadFailed)?;
let stream_id: u32 = xfer.stream_id.into();
stream_ids.insert(stream_id);
state = TxState::WaitingBufferForStreamId(stream_id);
}
TxState::WaitingBufferForStreamId(stream_id)
if descriptor.len() as usize == size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
u32::from(
self.streams.read().unwrap()[stream_id as usize]
.params
.buffer_bytes,
) as usize,
descriptor.len(),
)
.into());
}
TxState::WaitingBufferForStreamId(_stream_id) => {
let mut buf = vec![0; descriptor.len() as usize];
let bytes_read = desc_chain
.memory()
.read(&mut buf, descriptor.addr())
.map_err(|_| Error::DescriptorReadFailed)?;
buf.truncate(bytes_read);
buffers.push(Buffer::new(buf, Arc::clone(&message)));
}
}
}
}
if !stream_ids.is_empty() {
let b = audio_backend.write().unwrap();
for id in stream_ids {
b.write(id).unwrap();
}
}
Ok(false)
}

View File

@ -20,7 +20,6 @@ use vhost_user_backend::{VhostUserDaemon, VringRwLock, VringT};
use virtio_sound::*;
use vm_memory::{
ByteValued, Bytes, GuestMemoryAtomic, GuestMemoryLoadGuard, GuestMemoryMmap, Le32,
VolatileSlice,
};
use crate::device::VhostUserSoundBackend;
@ -226,16 +225,39 @@ impl SoundConfig {
}
}
pub type SoundBitmap = ();
#[derive(Debug)]
pub struct SoundRequest<'a> {
data_slice: Option<VolatileSlice<'a, SoundBitmap>>,
pub struct IOMessage {
status: std::sync::atomic::AtomicU32,
desc_chain: SoundDescriptorChain,
descriptor: virtio_queue::Descriptor,
vring: VringRwLock,
}
impl<'a> SoundRequest<'a> {
pub fn data_slice(&self) -> Option<&VolatileSlice<'a, SoundBitmap>> {
self.data_slice.as_ref()
impl Drop for IOMessage {
fn drop(&mut self) {
log::trace!("dropping IOMessage");
let resp = VirtioSoundPcmStatus {
status: self.status.load(std::sync::atomic::Ordering::SeqCst).into(),
latency_bytes: 0.into(),
};
if let Err(err) = self
.desc_chain
.memory()
.write_obj(resp, self.descriptor.addr())
{
log::error!("Error::DescriptorWriteFailed: {}", err);
return;
}
if self
.vring
.add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32)
.is_err()
{
log::error!("Couldn't add used");
}
if self.vring.signal_used_queue().is_err() {
log::error!("Couldn't signal used queue");
}
}
}
@ -249,7 +271,7 @@ pub fn start_backend_server(config: SoundConfig) {
let mut daemon = VhostUserDaemon::new(
String::from("vhost-user-sound"),
backend.clone(),
GuestMemoryAtomic::new(GuestMemoryMmap::<SoundBitmap>::new()),
GuestMemoryAtomic::new(GuestMemoryMmap::new()),
)
.unwrap();

View File

@ -1,10 +1,12 @@
// Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{collections::VecDeque, sync::Arc};
use thiserror::Error as ThisError;
use vm_memory::{Le32, Le64};
use crate::{virtio_sound::*, SUPPORTED_FORMATS, SUPPORTED_RATES};
use crate::{virtio_sound::*, IOMessage, SUPPORTED_FORMATS, SUPPORTED_RATES};
/// Stream errors.
#[derive(Debug, ThisError)]
@ -172,6 +174,7 @@ pub struct Stream {
pub channels_min: u8,
pub channels_max: u8,
pub state: PCMState,
pub buffers: VecDeque<Buffer>,
}
impl Default for Stream {
@ -185,6 +188,7 @@ impl Default for Stream {
channels_min: 1,
channels_max: 6,
state: Default::default(),
buffers: VecDeque::new(),
}
}
}
@ -228,3 +232,35 @@ impl Default for PcmParams {
}
}
}
pub struct Buffer {
pub bytes: Vec<u8>,
pub pos: usize,
pub message: Arc<IOMessage>,
}
impl std::fmt::Debug for Buffer {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct(stringify!(Buffer))
.field("bytes", &self.bytes.len())
.field("pos", &self.pos)
.field("message", &Arc::as_ptr(&self.message))
.finish()
}
}
impl Buffer {
pub fn new(bytes: Vec<u8>, message: Arc<IOMessage>) -> Self {
Self {
bytes,
pos: 0,
message,
}
}
}
impl Drop for Buffer {
fn drop(&mut self) {
log::trace!("dropping buffer {:?}", self);
}
}