diff --git a/staging/vhost-device-sound/src/audio_backends/alsa.rs b/staging/vhost-device-sound/src/audio_backends/alsa.rs index 4bedbc6..aacb878 100644 --- a/staging/vhost-device-sound/src/audio_backends/alsa.rs +++ b/staging/vhost-device-sound/src/audio_backends/alsa.rs @@ -34,6 +34,7 @@ type AResult = std::result::Result; #[derive(Clone, Debug)] pub struct AlsaBackend { sender: Arc>>, + streams: Arc>>, } #[derive(Debug)] @@ -42,7 +43,6 @@ enum AlsaAction { Prepare(usize), Release(usize, ControlMessage), Start(usize), - Stop(usize), Write(usize), Read(usize), } @@ -145,10 +145,21 @@ fn write_samples_direct( let Some(buffer) = stream.buffers.front_mut() else { return Ok(false); }; + if !matches!(stream.state, PCMState::Start) { + return Ok(false); + } let mut buf = vec![0; buffer.data_descriptor.len() as usize]; - let read_bytes = buffer - .consume(&mut buf) - .expect("failed to read buffer from guest"); + let read_bytes = match buffer.consume(&mut buf) { + Err(err) => { + log::error!( + "Could not read TX buffer from guest, dropping it immediately: {}", + err + ); + stream.buffers.pop_front(); + continue; + } + Ok(v) => v, + }; let mut iter = buf[0..read_bytes as usize].iter().cloned(); let frames = mmap.write(&mut iter); let written_bytes = pcm.frames_to_bytes(frames); @@ -160,48 +171,56 @@ fn write_samples_direct( } } match mmap.status().state() { - State::Running => { - return Ok(false); - } - State::Prepared => {} - State::XRun => { - log::trace!("Underrun in audio output stream!"); - pcm.prepare()? - } - State::Suspended => {} + State::Suspended | State::Running | State::Prepared => Ok(false), + State::XRun => Ok(true), // Recover from this in next round n => panic!("Unexpected pcm state {:?}", n), } - Ok(true) } fn write_samples_io( p: &alsa::PCM, - stream: &mut Stream, + streams: &Arc>>, + stream_id: usize, io: &mut alsa::pcm::IO, ) -> AResult { - loop { - let avail = match p.avail_update() { - Ok(n) => n, - Err(err) => { - log::trace!("Recovering from {}", err); - p.recover(err.errno() as std::os::raw::c_int, true)?; - p.avail_update()? + let avail = match p.avail_update() { + Ok(n) => n, + Err(err) => { + log::trace!("Recovering from {}", err); + p.recover(err.errno() as std::os::raw::c_int, true)?; + if let Err(err) = p.start() { + log::error!( + "Could not restart stream {}; ALSA returned: {}", + stream_id, + err + ); + return Err(err); } - }; - if avail == 0 { - break; + p.avail_update()? } - let written = io.mmap(avail as usize, |buf| { + }; + if avail != 0 { + io.mmap(avail as usize, |buf| { + let stream = &mut streams.write().unwrap()[stream_id]; let Some(buffer) = stream.buffers.front_mut() else { return 0; }; + if !matches!(stream.state, PCMState::Start) { + stream.buffers.pop_front(); + return 0; + } let mut data = vec![0; buffer.data_descriptor.len() as usize]; - // consume() always reads (buffer.data_descriptor.len() - // buffer.pos) bytes - let read_bytes = buffer - .consume(&mut data) - .expect("failed to read buffer from guest"); + let read_bytes = match buffer.consume(&mut data) { + Ok(v) => v, + Err(err) => { + log::error!("Could not read TX buffer, dropping it immediately: {}", err); + stream.buffers.pop_front(); + return 0; + } + }; + let mut iter = data[0..read_bytes as usize].iter().cloned(); let mut written_bytes = 0; @@ -217,14 +236,12 @@ fn write_samples_io( .try_into() .unwrap_or_default() })?; - if written == 0 { - break; - }; + } else { + return Ok(false); } match p.state() { - State::Suspended | State::Running => Ok(false), - State::Prepared => Ok(false), + State::Suspended | State::Running | State::Prepared => Ok(false), State::XRun => Ok(true), // Recover from this in next round n => panic!("Unexpected pcm state {:?}", n), } @@ -237,47 +254,44 @@ fn alsa_worker( stream_id: usize, ) -> AResult<()> { loop { + // We get a `true` every time a new I/O message is received from the guest. + // If the recv() returns `Ok(false)` or an error, terminate this worker thread. let Ok(do_write) = receiver.recv() else { return Ok(()); }; if do_write { - loop { - if matches!(receiver.try_recv(), Ok(false)) { - break; - } - + let has_buffers = || -> bool { + // Hold `streams` lock as short as possible. + let lck = streams.read().unwrap(); + !lck[stream_id].buffers.is_empty() + && matches!(lck[stream_id].state, PCMState::Start) + }; + // Run this loop till the stream's buffer vector is empty: + 'empty_buffers: while has_buffers() { + // When we return from a write attempt and there is still space in the + // stream's buffers, get the ALSA file descriptors and poll them till the host + // sound device tells us there is more available data. let mut fds = { let lck = pcm.lock().unwrap(); - if matches!(lck.state(), State::Running | State::Prepared | State::XRun) { - let mut mmap = lck.direct_mmap_playback::().ok(); + let mut mmap = lck.direct_mmap_playback::().ok(); - if let Some(ref mut mmap) = mmap { - if write_samples_direct( - &lck, - &mut streams.write().unwrap()[stream_id], - mmap, - )? { - continue; - } - } else { - let mut io = lck.io_bytes(); - // Direct mode unavailable, use alsa-lib's mmap emulation instead - if write_samples_io( - &lck, - &mut streams.write().unwrap()[stream_id], - &mut io, - )? { - continue; - } + if let Some(ref mut mmap) = mmap { + if write_samples_direct( + &lck, + &mut streams.write().unwrap()[stream_id], + mmap, + )? { + continue 'empty_buffers; } - lck.get()? } else { - drop(lck); - sleep(Duration::from_millis(500)); - continue; + let mut io = lck.io_bytes(); + // Direct mode unavailable, use alsa-lib's mmap emulation instead + if write_samples_io(&lck, &streams, stream_id, &mut io)? { + continue 'empty_buffers; + } } + lck.get()? }; - // Nothing to do, sleep until woken up by the kernel. alsa::poll::poll(&mut fds, 100)?; } } @@ -288,14 +302,15 @@ impl AlsaBackend { pub fn new(streams: Arc>>) -> Self { let (sender, receiver) = channel(); let sender = Arc::new(Mutex::new(sender)); + let streams2 = Arc::clone(&streams); thread::spawn(move || { - if let Err(err) = Self::run(streams, receiver) { + if let Err(err) = Self::run(streams2, receiver) { log::error!("Main thread exited with error: {}", err); } }); - Self { sender } + Self { sender, streams } } fn run( @@ -380,21 +395,7 @@ impl AlsaBackend { ); } } - } - AlsaAction::Stop(stream_id) => { - if stream_id >= streams_no { - log::error!( - "Received Stop action for stream id {} but there are only {} PCM \ - streams.", - stream_id, - pcms.len() - ); - continue; - }; - let stop_result = streams.write().unwrap()[stream_id].state.stop(); - if let Err(err) = stop_result { - log::error!("Stream {} stop {}", stream_id, err); - } + senders[stream_id].send(true).unwrap(); } AlsaAction::Prepare(stream_id) => { if stream_id >= streams_no { @@ -529,8 +530,21 @@ impl AudioBackend for AlsaBackend { read Read, prepare Prepare, start Start, - stop Stop, } + + fn stop(&self, id: u32) -> CrateResult<()> { + if let Some(Err(err)) = self + .streams + .write() + .unwrap() + .get_mut(id as usize) + .map(|s| s.state.stop()) + { + log::error!("Stream {} stop {}", id, err); + } + Ok(()) + } + send_action! { ctrl set_parameters SetParameters, ctrl release Release,