sound/alsa: stop actual playback after PCM_STOP

Queued buffers were played even after the guest issued PCM_STOP.

This commit:

- makes the ALSA worker threads respect the stopped status instead
- send a start signal to worker threads on PCM_START to handle
  pre-buffering
- reduce the time of playback callbacks holding the stream lock

Signed-off-by: Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
This commit is contained in:
Manos Pitsidianakis 2023-10-05 11:00:21 +03:00 committed by Alex Bennée
parent aa16ef0699
commit c10b8e18c4

View File

@ -34,6 +34,7 @@ type AResult<T> = std::result::Result<T, alsa::Error>;
#[derive(Clone, Debug)]
pub struct AlsaBackend {
sender: Arc<Mutex<Sender<AlsaAction>>>,
streams: Arc<RwLock<Vec<Stream>>>,
}
#[derive(Debug)]
@ -42,7 +43,6 @@ enum AlsaAction {
Prepare(usize),
Release(usize, ControlMessage),
Start(usize),
Stop(usize),
Write(usize),
Read(usize),
}
@ -145,10 +145,21 @@ fn write_samples_direct(
let Some(buffer) = stream.buffers.front_mut() else {
return Ok(false);
};
if !matches!(stream.state, PCMState::Start) {
return Ok(false);
}
let mut buf = vec![0; buffer.data_descriptor.len() as usize];
let read_bytes = buffer
.consume(&mut buf)
.expect("failed to read buffer from guest");
let read_bytes = match buffer.consume(&mut buf) {
Err(err) => {
log::error!(
"Could not read TX buffer from guest, dropping it immediately: {}",
err
);
stream.buffers.pop_front();
continue;
}
Ok(v) => v,
};
let mut iter = buf[0..read_bytes as usize].iter().cloned();
let frames = mmap.write(&mut iter);
let written_bytes = pcm.frames_to_bytes(frames);
@ -160,48 +171,56 @@ fn write_samples_direct(
}
}
match mmap.status().state() {
State::Running => {
return Ok(false);
}
State::Prepared => {}
State::XRun => {
log::trace!("Underrun in audio output stream!");
pcm.prepare()?
}
State::Suspended => {}
State::Suspended | State::Running | State::Prepared => Ok(false),
State::XRun => Ok(true), // Recover from this in next round
n => panic!("Unexpected pcm state {:?}", n),
}
Ok(true)
}
fn write_samples_io(
p: &alsa::PCM,
stream: &mut Stream,
streams: &Arc<RwLock<Vec<Stream>>>,
stream_id: usize,
io: &mut alsa::pcm::IO<u8>,
) -> AResult<bool> {
loop {
let avail = match p.avail_update() {
Ok(n) => n,
Err(err) => {
log::trace!("Recovering from {}", err);
p.recover(err.errno() as std::os::raw::c_int, true)?;
p.avail_update()?
let avail = match p.avail_update() {
Ok(n) => n,
Err(err) => {
log::trace!("Recovering from {}", err);
p.recover(err.errno() as std::os::raw::c_int, true)?;
if let Err(err) = p.start() {
log::error!(
"Could not restart stream {}; ALSA returned: {}",
stream_id,
err
);
return Err(err);
}
};
if avail == 0 {
break;
p.avail_update()?
}
let written = io.mmap(avail as usize, |buf| {
};
if avail != 0 {
io.mmap(avail as usize, |buf| {
let stream = &mut streams.write().unwrap()[stream_id];
let Some(buffer) = stream.buffers.front_mut() else {
return 0;
};
if !matches!(stream.state, PCMState::Start) {
stream.buffers.pop_front();
return 0;
}
let mut data = vec![0; buffer.data_descriptor.len() as usize];
// consume() always reads (buffer.data_descriptor.len() -
// buffer.pos) bytes
let read_bytes = buffer
.consume(&mut data)
.expect("failed to read buffer from guest");
let read_bytes = match buffer.consume(&mut data) {
Ok(v) => v,
Err(err) => {
log::error!("Could not read TX buffer, dropping it immediately: {}", err);
stream.buffers.pop_front();
return 0;
}
};
let mut iter = data[0..read_bytes as usize].iter().cloned();
let mut written_bytes = 0;
@ -217,14 +236,12 @@ fn write_samples_io(
.try_into()
.unwrap_or_default()
})?;
if written == 0 {
break;
};
} else {
return Ok(false);
}
match p.state() {
State::Suspended | State::Running => Ok(false),
State::Prepared => Ok(false),
State::Suspended | State::Running | State::Prepared => Ok(false),
State::XRun => Ok(true), // Recover from this in next round
n => panic!("Unexpected pcm state {:?}", n),
}
@ -237,47 +254,44 @@ fn alsa_worker(
stream_id: usize,
) -> AResult<()> {
loop {
// We get a `true` every time a new I/O message is received from the guest.
// If the recv() returns `Ok(false)` or an error, terminate this worker thread.
let Ok(do_write) = receiver.recv() else {
return Ok(());
};
if do_write {
loop {
if matches!(receiver.try_recv(), Ok(false)) {
break;
}
let has_buffers = || -> bool {
// Hold `streams` lock as short as possible.
let lck = streams.read().unwrap();
!lck[stream_id].buffers.is_empty()
&& matches!(lck[stream_id].state, PCMState::Start)
};
// Run this loop till the stream's buffer vector is empty:
'empty_buffers: while has_buffers() {
// When we return from a write attempt and there is still space in the
// stream's buffers, get the ALSA file descriptors and poll them till the host
// sound device tells us there is more available data.
let mut fds = {
let lck = pcm.lock().unwrap();
if matches!(lck.state(), State::Running | State::Prepared | State::XRun) {
let mut mmap = lck.direct_mmap_playback::<u8>().ok();
let mut mmap = lck.direct_mmap_playback::<u8>().ok();
if let Some(ref mut mmap) = mmap {
if write_samples_direct(
&lck,
&mut streams.write().unwrap()[stream_id],
mmap,
)? {
continue;
}
} else {
let mut io = lck.io_bytes();
// Direct mode unavailable, use alsa-lib's mmap emulation instead
if write_samples_io(
&lck,
&mut streams.write().unwrap()[stream_id],
&mut io,
)? {
continue;
}
if let Some(ref mut mmap) = mmap {
if write_samples_direct(
&lck,
&mut streams.write().unwrap()[stream_id],
mmap,
)? {
continue 'empty_buffers;
}
lck.get()?
} else {
drop(lck);
sleep(Duration::from_millis(500));
continue;
let mut io = lck.io_bytes();
// Direct mode unavailable, use alsa-lib's mmap emulation instead
if write_samples_io(&lck, &streams, stream_id, &mut io)? {
continue 'empty_buffers;
}
}
lck.get()?
};
// Nothing to do, sleep until woken up by the kernel.
alsa::poll::poll(&mut fds, 100)?;
}
}
@ -288,14 +302,15 @@ impl AlsaBackend {
pub fn new(streams: Arc<RwLock<Vec<Stream>>>) -> Self {
let (sender, receiver) = channel();
let sender = Arc::new(Mutex::new(sender));
let streams2 = Arc::clone(&streams);
thread::spawn(move || {
if let Err(err) = Self::run(streams, receiver) {
if let Err(err) = Self::run(streams2, receiver) {
log::error!("Main thread exited with error: {}", err);
}
});
Self { sender }
Self { sender, streams }
}
fn run(
@ -380,21 +395,7 @@ impl AlsaBackend {
);
}
}
}
AlsaAction::Stop(stream_id) => {
if stream_id >= streams_no {
log::error!(
"Received Stop action for stream id {} but there are only {} PCM \
streams.",
stream_id,
pcms.len()
);
continue;
};
let stop_result = streams.write().unwrap()[stream_id].state.stop();
if let Err(err) = stop_result {
log::error!("Stream {} stop {}", stream_id, err);
}
senders[stream_id].send(true).unwrap();
}
AlsaAction::Prepare(stream_id) => {
if stream_id >= streams_no {
@ -529,8 +530,21 @@ impl AudioBackend for AlsaBackend {
read Read,
prepare Prepare,
start Start,
stop Stop,
}
fn stop(&self, id: u32) -> CrateResult<()> {
if let Some(Err(err)) = self
.streams
.write()
.unwrap()
.get_mut(id as usize)
.map(|s| s.state.stop())
{
log::error!("Stream {} stop {}", id, err);
}
Ok(())
}
send_action! {
ctrl set_parameters SetParameters,
ctrl release Release,