sound: Implement RX in ALSA backend

Add an RX code path that is similar to the existing TX code in the ALSA
backend.

Signed-off-by: Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
This commit is contained in:
Manos Pitsidianakis 2023-07-24 13:53:16 +03:00 committed by Alex Bennée
parent d00014f5a6
commit 1921ec21d8
4 changed files with 254 additions and 65 deletions

View File

@ -49,8 +49,7 @@ enum AlsaAction {
Prepare(usize),
Release(usize, ControlMessage),
Start(usize),
Write(usize),
Read(usize),
DoWork(usize),
}
fn update_pcm(
@ -236,6 +235,52 @@ fn write_samples_direct(
}
}
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to read.
fn read_samples_direct(
_pcm: &alsa::PCM,
stream: &mut Stream,
mmap: &mut alsa::direct::pcm::MmapCapture<u8>,
) -> AResult<bool> {
while mmap.avail() > 0 {
let Some(buffer) = stream.buffers.front_mut() else {
return Ok(false);
};
// Read samples from DMA area with an iterator
let mut iter = mmap.iter();
let mut n_bytes = 0;
// We can't access the descriptor memory region as a slice (see
// [`vm_memory::volatile_memory::VolatileSlice`]) and we can't use alsa's readi
// without a slice: use an intermediate buffer and copy it to the
// descriptor.
let mut intermediate_buf = vec![0; buffer.desc_len() as usize - buffer.pos];
for (sample, byte) in intermediate_buf.iter_mut().zip(&mut iter) {
*sample = byte;
n_bytes += 1;
}
if buffer
.write_input(&intermediate_buf[0..n_bytes])
.expect("Could not write data to guest memory")
== 0
{
break;
}
drop(iter);
if buffer.pos as u32 >= buffer.desc_len() || mmap.avail() == 0 {
stream.buffers.pop_front();
}
}
match mmap.status().state() {
State::Suspended | State::Running | State::Prepared => Ok(false),
State::XRun => Ok(true), // Recover from this in next round
n => panic!("Unexpected pcm state {:?}", n),
}
}
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to write.
fn write_samples_io(
@ -283,7 +328,7 @@ fn write_samples_io(
};
buffer.pos += read_bytes as usize;
if buffer.pos >= buffer.desc_len() as usize {
if buffer.pos as u32 >= buffer.desc_len() {
stream.buffers.pop_front();
}
p.bytes_to_frames(read_bytes as isize)
@ -301,19 +346,90 @@ fn write_samples_io(
}
}
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to read.
fn read_samples_io(
p: &alsa::PCM,
streams: &Arc<RwLock<Vec<Stream>>>,
stream_id: usize,
io: &mut alsa::pcm::IO<u8>,
) -> AResult<bool> {
let avail = match p.avail_update() {
Ok(n) => n,
Err(err) => {
log::trace!("Recovering from {}", err);
p.recover(err.errno() as std::os::raw::c_int, true)?;
if let Err(err) = p.start() {
log::error!(
"Could not restart stream {}; ALSA returned: {}",
stream_id,
err
);
return Err(err);
}
p.avail_update()?
}
};
if avail == 0 {
return Ok(false);
}
let stream = &mut streams.write().unwrap()[stream_id];
let Some(buffer) = stream.buffers.front_mut() else {
return Ok(false);
};
if !matches!(stream.state, PCMState::Start) {
stream.buffers.pop_front();
return Ok(false);
}
let mut frames_read = 0;
// We can't access the descriptor memory region as a slice (see
// [`vm_memory::volatile_memory::VolatileSlice`]) and we can't use alsa's readi
// without a slice: use an intermediate buffer and copy it to the
// descriptor.
let mut intermediate_buf = vec![0; buffer.desc_len() as usize - buffer.pos];
while let Some(frames) = io
.readi(&mut intermediate_buf[0..(buffer.desc_len() as usize - buffer.pos)])
.map(std::num::NonZeroUsize::new)?
.map(std::num::NonZeroUsize::get)
{
frames_read += frames;
let n_bytes = usize::try_from(p.frames_to_bytes(frames as i64)).unwrap_or_default();
if buffer
.write_input(&intermediate_buf[0..n_bytes])
.expect("Could not write data to guest memory")
== 0
{
break;
}
}
let bytes_read = p.frames_to_bytes(frames_read as i64);
if buffer.pos as u32 >= buffer.desc_len() || bytes_read == 0 {
stream.buffers.pop_front();
}
match p.state() {
State::Suspended | State::Running | State::Prepared => Ok(false),
State::XRun => Ok(true), // Recover from this in next round
n => panic!("Unexpected pcm state {:?}", n),
}
}
fn alsa_worker(
pcm: Arc<Mutex<PCM>>,
streams: Arc<RwLock<Vec<Stream>>>,
receiver: &Receiver<bool>,
stream_id: usize,
) -> AResult<()> {
let direction = streams.write().unwrap()[stream_id].direction;
loop {
// We get a `true` every time a new I/O message is received from the guest.
// If the recv() returns `Ok(false)` or an error, terminate this worker thread.
let Ok(do_write) = receiver.recv() else {
let Ok(do_work) = receiver.recv() else {
return Ok(());
};
if do_write {
if do_work {
let has_buffers = || -> bool {
// Hold `streams` lock as short as possible.
let lck = streams.read().unwrap();
@ -322,26 +438,48 @@ fn alsa_worker(
};
// Run this loop till the stream's buffer vector is empty:
'empty_buffers: while has_buffers() {
// When we return from a write attempt and there is still space in the
// When we return from a read/write attempt and there is still space in the
// stream's buffers, get the ALSA file descriptors and poll them till the host
// sound device tells us there is more available data.
let mut fds = {
let lck = pcm.lock().unwrap();
let mut mmap = lck.direct_mmap_playback::<u8>().ok();
match direction {
Direction::Output => {
let mut mmap = lck.direct_mmap_playback::<u8>().ok();
if let Some(ref mut mmap) = mmap {
if write_samples_direct(
&lck,
&mut streams.write().unwrap()[stream_id],
mmap,
)? {
continue 'empty_buffers;
if let Some(ref mut mmap) = mmap {
if write_samples_direct(
&lck,
&mut streams.write().unwrap()[stream_id],
mmap,
)? {
continue 'empty_buffers;
}
} else {
let mut io = lck.io_bytes();
// Direct mode unavailable, use alsa-lib's mmap emulation instead
if write_samples_io(&lck, &streams, stream_id, &mut io)? {
continue 'empty_buffers;
}
}
}
} else {
let mut io = lck.io_bytes();
// Direct mode unavailable, use alsa-lib's mmap emulation instead
if write_samples_io(&lck, &streams, stream_id, &mut io)? {
continue 'empty_buffers;
Direction::Input => {
let mut mmap = lck.direct_mmap_capture::<u8>().ok();
if let Some(ref mut mmap) = mmap {
if read_samples_direct(
&lck,
&mut streams.write().unwrap()[stream_id],
mmap,
)? {
continue 'empty_buffers;
}
} else {
let mut io = lck.io_bytes();
if read_samples_io(&lck, &streams, stream_id, &mut io)? {
continue 'empty_buffers;
}
}
}
}
lck.get()?
@ -412,11 +550,10 @@ impl AlsaBackend {
while let Ok(action) = receiver.recv() {
match action {
AlsaAction::Read(_) => {}
AlsaAction::Write(stream_id) => {
AlsaAction::DoWork(stream_id) => {
if stream_id >= streams_no {
log::error!(
"Received Write action for stream id {} but there are only {} PCM \
"Received DoWork action for stream id {} but there are only {} PCM \
streams.",
stream_id,
pcms.len()
@ -589,8 +726,8 @@ macro_rules! send_action {
impl AudioBackend for AlsaBackend {
send_action! {
write Write,
read Read,
write DoWork,
read DoWork,
prepare Prepare,
start Start,
}

View File

@ -19,6 +19,7 @@ use virtio_bindings::{
use virtio_queue::{DescriptorChain, QueueOwnedT};
use vm_memory::{
ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard, GuestMemoryMmap,
Le32,
};
use vmm_sys_util::{
epoll::EventSet,
@ -102,8 +103,8 @@ impl VhostUserSoundThread {
match queue_idx {
CONTROL_QUEUE_IDX => self.process_control(vring, audio_backend),
EVENT_QUEUE_IDX => self.process_event(vring),
TX_QUEUE_IDX => self.process_tx(vring, audio_backend),
RX_QUEUE_IDX => self.process_rx(vring, audio_backend),
TX_QUEUE_IDX => self.process_io(vring, audio_backend, Direction::Output),
RX_QUEUE_IDX => self.process_io(vring, audio_backend, Direction::Input),
_ => Err(Error::HandleUnknownEvent.into()),
}?;
if !vring.enable_notification().unwrap() {
@ -115,8 +116,8 @@ impl VhostUserSoundThread {
match queue_idx {
CONTROL_QUEUE_IDX => self.process_control(vring, audio_backend),
EVENT_QUEUE_IDX => self.process_event(vring),
TX_QUEUE_IDX => self.process_tx(vring, audio_backend),
RX_QUEUE_IDX => self.process_rx(vring, audio_backend),
TX_QUEUE_IDX => self.process_io(vring, audio_backend, Direction::Output),
RX_QUEUE_IDX => self.process_io(vring, audio_backend, Direction::Input),
_ => Err(Error::HandleUnknownEvent.into()),
}?;
}
@ -448,10 +449,11 @@ impl VhostUserSoundThread {
Ok(false)
}
fn process_tx(
fn process_io(
&self,
vring: &VringRwLock,
audio_backend: &RwLock<Box<dyn AudioBackend + Send + Sync>>,
direction: Direction,
) -> IoResult<bool> {
let Some(ref atomic_mem) = self.mem else {
return Err(Error::NoMemoryConfigured.into());
@ -489,6 +491,7 @@ impl VhostUserSoundThread {
let message = Arc::new(IOMessage {
vring: vring.clone(),
status: VIRTIO_SND_S_OK.into(),
used_len: 0.into(),
latency_bytes: 0.into(),
desc_chain: desc_chain.clone(),
response_descriptor: descriptors.last().cloned().ok_or_else(|| {
@ -501,7 +504,9 @@ impl VhostUserSoundThread {
IoState::Done => {
return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into());
}
IoState::Ready if descriptor.is_write_only() => {
IoState::Ready
if matches!(direction, Direction::Output) && descriptor.is_write_only() =>
{
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
@ -511,14 +516,9 @@ impl VhostUserSoundThread {
}
state = IoState::Done;
}
IoState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
descriptor.len(),
)
.into());
}
IoState::WaitingBufferForStreamId(stream_id)
if descriptor.len() as usize == size_of::<VirtioSoundPcmStatus>() =>
{
let mut streams = self.streams.write().unwrap();
for b in std::mem::take(&mut buffers) {
streams[stream_id as usize].buffers.push_back(b);
@ -551,13 +551,13 @@ impl VhostUserSoundThread {
u32::from(
self.streams.read().unwrap()[stream_id as usize]
.params
.buffer_bytes,
.period_bytes,
) as usize,
descriptor.len(),
)
.into());
}
IoState::WaitingBufferForStreamId(_stream_id) => {
IoState::WaitingBufferForStreamId(_) => {
// In the case of TX/Playback:
//
// Rather than copying the content of a descriptor, buffer keeps a pointer
@ -567,7 +567,7 @@ impl VhostUserSoundThread {
// knowing whether the content has been updated. The device only reads the
// buffer from guest memory when the audio engine requires it, which is
// about after a period thus ensuring that the buffer is up-to-date.
buffers.push(Buffer::new(*descriptor, Arc::clone(&message)));
buffers.push(Buffer::new(*descriptor, Arc::clone(&message), direction));
}
}
}
@ -575,22 +575,22 @@ impl VhostUserSoundThread {
if !stream_ids.is_empty() {
let b = audio_backend.write().unwrap();
for id in stream_ids {
b.write(id).unwrap();
match direction {
Direction::Output => {
for id in stream_ids {
b.write(id).unwrap();
}
}
Direction::Input => {
for id in stream_ids {
b.read(id).unwrap();
}
}
}
}
Ok(false)
}
fn process_rx(
&self,
_vring: &VringRwLock,
_audio_backend: &RwLock<Box<dyn AudioBackend + Send + Sync>>,
) -> IoResult<bool> {
log::trace!("process_rx");
Ok(false)
}
}
pub struct VhostUserSoundBackend {
@ -681,7 +681,7 @@ impl VhostUserSoundBackend {
threads,
virtio_cfg: VirtioSoundConfig {
jacks: 0.into(),
streams: 1.into(),
streams: Le32::from(streams_no as u32),
chmaps: 1.into(),
},
exit_event: EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?,
@ -883,8 +883,12 @@ mod tests {
vring.set_queue_info(0x100, 0x200, 0x300).unwrap();
vring.set_queue_ready(true);
assert!(t.process_control(&vring, &audio_backend).is_ok());
assert!(t.process_tx(&vring, &audio_backend).is_ok());
assert!(t.process_rx(&vring, &audio_backend).is_ok());
assert!(t
.process_io(&vring, &audio_backend, Direction::Output)
.is_ok());
assert!(t
.process_io(&vring, &audio_backend, Direction::Input)
.is_ok());
}
#[test]
@ -911,7 +915,9 @@ mod tests {
vring.set_queue_info(0x100, 0x200, 0x300).unwrap();
vring.set_queue_ready(true);
assert!(t.process_control(&vring, &audio_backend).is_err());
assert!(t.process_tx(&vring, &audio_backend).is_err());
assert!(t
.process_io(&vring, &audio_backend, Direction::Output)
.is_err());
}
#[test]

View File

@ -274,6 +274,7 @@ impl SoundConfig {
pub struct IOMessage {
status: std::sync::atomic::AtomicU32,
pub used_len: std::sync::atomic::AtomicU32,
pub latency_bytes: std::sync::atomic::AtomicU32,
desc_chain: SoundDescriptorChain,
response_descriptor: virtio_queue::Descriptor,
@ -289,6 +290,7 @@ impl Drop for IOMessage {
.load(std::sync::atomic::Ordering::SeqCst)
.into(),
};
let used_len: u32 = self.used_len.load(std::sync::atomic::Ordering::SeqCst);
log::trace!("dropping IOMessage {:?}", resp);
if let Err(err) = self
@ -299,10 +301,10 @@ impl Drop for IOMessage {
log::error!("Error::DescriptorWriteFailed: {}", err);
return;
}
if let Err(err) = self
.vring
.add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32)
{
if let Err(err) = self.vring.add_used(
self.desc_chain.head_index(),
resp.as_slice().len() as u32 + used_len,
) {
log::error!("Couldn't add used bytes count to vring: {}", err);
}
if let Err(err) = self.vring.signal_used_queue() {

View File

@ -17,6 +17,8 @@ pub enum Error {
InvalidStreamId(u32),
#[error("Descriptor read failed")]
DescriptorReadFailed,
#[error("Descriptor write failed")]
DescriptorWriteFailed,
}
type Result<T> = std::result::Result<T, Error>;
@ -239,23 +241,30 @@ pub struct Buffer {
data_descriptor: virtio_queue::Descriptor,
pub pos: usize,
pub message: Arc<IOMessage>,
direction: Direction,
}
impl std::fmt::Debug for Buffer {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct(stringify!(Buffer))
.field("pos", &self.pos)
.field("direction", &self.direction)
.field("message", &Arc::as_ptr(&self.message))
.finish()
}
}
impl Buffer {
pub fn new(data_descriptor: virtio_queue::Descriptor, message: Arc<IOMessage>) -> Self {
pub fn new(
data_descriptor: virtio_queue::Descriptor,
message: Arc<IOMessage>,
direction: Direction,
) -> Self {
Self {
pos: 0,
data_descriptor,
message,
direction,
}
}
@ -275,6 +284,26 @@ impl Buffer {
Ok(len as u32)
}
pub fn write_input(&mut self, buf: &[u8]) -> Result<u32> {
if self.desc_len() <= self.pos as u32 {
return Ok(0);
}
let addr = self.data_descriptor.addr();
let offset = self.pos as u64;
let len = self
.message
.desc_chain
.memory()
.write(
buf,
addr.checked_add(offset)
.expect("invalid guest memory address"),
)
.map_err(|_| Error::DescriptorWriteFailed)?;
self.pos += len;
Ok(len as u32)
}
#[inline]
/// Returns the length of the sound data [`virtio_queue::Descriptor`].
pub fn desc_len(&self) -> u32 {
@ -284,10 +313,23 @@ impl Buffer {
impl Drop for Buffer {
fn drop(&mut self) {
self.message
.latency_bytes
.fetch_add(self.desc_len(), std::sync::atomic::Ordering::SeqCst);
log::trace!("dropping buffer {:?}", self);
match self.direction {
Direction::Input => {
let used_len = std::cmp::min(self.pos as u32, self.desc_len());
self.message
.used_len
.fetch_add(used_len, std::sync::atomic::Ordering::SeqCst);
self.message
.latency_bytes
.fetch_add(used_len, std::sync::atomic::Ordering::SeqCst);
}
Direction::Output => {
self.message
.latency_bytes
.fetch_add(self.desc_len(), std::sync::atomic::Ordering::SeqCst);
}
}
log::trace!("dropping {:?} buffer {:?}", self.direction, self);
}
}
@ -359,6 +401,7 @@ mod tests {
IOMessage {
status: VIRTIO_SND_S_OK.into(),
latency_bytes: 0.into(),
used_len: 0.into(),
desc_chain: prepare_desc_chain::<VirtioSndPcmSetParams>(GuestAddress(0), hdr, 1),
response_descriptor: Descriptor::new(next_addr, 0x200, VRING_DESC_F_NEXT as u16, 1),
vring,
@ -508,6 +551,7 @@ mod tests {
let buffer = Buffer::new(
desc_msg.desc_chain.clone().readable().next().unwrap(),
message,
Direction::Output,
);
let mut buf = vec![0; 5];