sound: clean up and improve comments, definitions

- Add some documentation comments
- Rename TxState to IoState in preparation for RX implementation
- Fix inline rust code not being escaped properly in doc comments
- Rename some fields to make their function clearer
- Cleanup some error log messages
- Replace an old TODO comment about queue size with an explanation

Signed-off-by: Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
This commit is contained in:
Manos Pitsidianakis 2023-10-06 15:41:06 +03:00 committed by Alex Bennée
parent 04f80fc8e9
commit 30d5cf4e10
3 changed files with 62 additions and 44 deletions

View File

@ -195,15 +195,14 @@ fn update_pcm(
Ok(())
}
// Returns `true` if the function should be called again, because there are are
// more data left to write.
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to write.
fn write_samples_direct(
pcm: &alsa::PCM,
stream: &mut Stream,
mmap: &mut alsa::direct::pcm::MmapPlayback<u8>,
) -> AResult<bool> {
while mmap.avail() > 0 {
// Write samples to DMA area from iterator
let Some(buffer) = stream.buffers.front_mut() else {
return Ok(false);
};
@ -223,6 +222,7 @@ fn write_samples_direct(
}
Ok(v) => v,
};
// Write samples to DMA area from iterator
let mut iter = buf[0..read_bytes as usize].iter().cloned();
let frames = mmap.write(&mut iter);
let written_bytes = pcm.frames_to_bytes(frames);
@ -240,6 +240,8 @@ fn write_samples_direct(
}
}
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to write.
fn write_samples_io(
p: &alsa::PCM,
streams: &Arc<RwLock<Vec<Stream>>>,
@ -385,6 +387,9 @@ impl AlsaBackend {
let mut senders = Vec::with_capacity(streams_no);
for i in 0..streams_no {
let (sender, receiver) = channel();
// Initialize with a dummy value, which will be updated every time we call
// `update_pcm`.
let pcm = Arc::new(Mutex::new(PCM::new("default", Direction::Playback, false)?));
let mtx = Arc::clone(&pcm);
@ -495,16 +500,18 @@ impl AlsaBackend {
msg.code = VIRTIO_SND_S_BAD_MSG;
continue;
};
// Stop the worker.
// Stop worker thread
senders[stream_id].send(false).unwrap();
let mut streams = streams.write().unwrap();
if let Err(err) = streams[stream_id].state.release() {
log::error!("Stream {}: {}", stream_id, err);
msg.code = VIRTIO_SND_S_BAD_MSG;
}
// Release buffers even if state transition is invalid. If it is invalid, we
// won't be in a valid device state anyway so better to get rid of them and
// free the virt queue.
// Drop pending stream buffers to complete pending I/O messages
//
// This will release buffers even if state transition is invalid. If it is
// invalid, we won't be in a valid device state anyway so better to get rid of
// them and free the virt queue.
std::mem::take(&mut streams[stream_id].buffers);
}
AlsaAction::SetParameters(stream_id, mut msg) => {
@ -545,9 +552,9 @@ impl AlsaBackend {
st.params.format = request.format;
st.params.rate = request.rate;
}
// Manually drop msg for faster response: the kernel has a timeout.
drop(msg);
}
// Manually drop msg for faster response: the kernel has a timeout.
drop(msg);
update_pcm(&pcms[stream_id], stream_id, &streams)?;
}
}

View File

@ -467,17 +467,23 @@ impl VhostUserSoundThread {
return Ok(true);
}
// Instead of counting descriptor chain lengths, encode the "parsing" logic in
// an enumeration. Then, the compiler will complain about any unhandled
// match {} cases if any part of the code is changed. This makes invalid
// states unrepresentable in the source code.
#[derive(Copy, Clone, PartialEq, Debug)]
enum TxState {
enum IoState {
Ready,
WaitingBufferForStreamId(u32),
Done,
}
// Keep log of stream IDs to wake up, in case the guest has queued more than
// one.
let mut stream_ids = BTreeSet::default();
for desc_chain in requests {
let mut state = TxState::Ready;
let mut state = IoState::Ready;
let mut buffers = vec![];
let descriptors: Vec<_> = desc_chain.clone().collect();
let message = Arc::new(IOMessage {
@ -485,14 +491,17 @@ impl VhostUserSoundThread {
status: VIRTIO_SND_S_OK.into(),
latency_bytes: 0.into(),
desc_chain: desc_chain.clone(),
descriptor: descriptors.last().cloned().unwrap(),
response_descriptor: descriptors.last().cloned().ok_or_else(|| {
log::error!("Received IO request with an empty descriptor chain.");
Error::UnexpectedDescriptorCount(0)
})?,
});
for descriptor in &descriptors {
match state {
TxState::Done => {
IoState::Done => {
return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into());
}
TxState::Ready if descriptor.is_write_only() => {
IoState::Ready if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
@ -500,9 +509,9 @@ impl VhostUserSoundThread {
)
.into());
}
state = TxState::Done;
state = IoState::Done;
}
TxState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
IoState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
@ -514,9 +523,9 @@ impl VhostUserSoundThread {
for b in std::mem::take(&mut buffers) {
streams[stream_id as usize].buffers.push_back(b);
}
state = TxState::Done;
state = IoState::Done;
}
TxState::Ready
IoState::Ready
if descriptor.len() as usize != size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
@ -525,7 +534,7 @@ impl VhostUserSoundThread {
)
.into());
}
TxState::Ready => {
IoState::Ready => {
let xfer = desc_chain
.memory()
.read_obj::<VirtioSoundPcmXfer>(descriptor.addr())
@ -533,9 +542,9 @@ impl VhostUserSoundThread {
let stream_id: u32 = xfer.stream_id.into();
stream_ids.insert(stream_id);
state = TxState::WaitingBufferForStreamId(stream_id);
state = IoState::WaitingBufferForStreamId(stream_id);
}
TxState::WaitingBufferForStreamId(stream_id)
IoState::WaitingBufferForStreamId(stream_id)
if descriptor.len() as usize == size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
@ -548,16 +557,16 @@ impl VhostUserSoundThread {
)
.into());
}
TxState::WaitingBufferForStreamId(_stream_id) => {
/*
Rather than copying the content of a descriptor, buffer keeps a pointer to it.
When we copy just after the request is enqueued, the guest's userspace may or
may not have updated the buffer contents. Guest driver simply moves buffers
from the used ring to the available ring without knowing whether the content
has been updated. The device only reads the buffer from guest memory when the
audio engine requires it, which is about after a period thus ensuring that the
buffer is up-to-date.
*/
IoState::WaitingBufferForStreamId(_stream_id) => {
// In the case of TX/Playback:
//
// Rather than copying the content of a descriptor, buffer keeps a pointer
// to it. When we copy just after the request is enqueued, the guest's
// userspace may or may not have updated the buffer contents. Guest driver
// simply moves buffers from the used ring to the available ring without
// knowing whether the content has been updated. The device only reads the
// buffer from guest memory when the audio engine requires it, which is
// about after a period thus ensuring that the buffer is up-to-date.
buffers.push(Buffer::new(*descriptor, Arc::clone(&message)));
}
}
@ -626,7 +635,7 @@ impl VhostUserSoundBackend {
},
];
let chmaps: Arc<RwLock<Vec<VirtioSoundChmapInfo>>> = Arc::new(RwLock::new(chmaps_info));
log::trace!("VhostUserSoundBackend::new config {:?}", &config);
log::trace!("VhostUserSoundBackend::new(config = {:?})", &config);
let threads = if config.multi_thread {
vec![
RwLock::new(VhostUserSoundThread::new(
@ -691,7 +700,10 @@ impl VhostUserBackend<VringRwLock, ()> for VhostUserSoundBackend {
}
fn max_queue_size(&self) -> usize {
// TODO: Investigate if an alternative value makes any difference.
// The linux kernel driver does no checks for queue length and fails silently if
// a queue is filled up. In this case, adding an element to the queue
// returns ENOSPC and the element is not queued for a later attempt and
// is lost. `64` is a "good enough" value from our observations.
64
}

View File

@ -250,13 +250,12 @@ pub struct IOMessage {
status: std::sync::atomic::AtomicU32,
pub latency_bytes: std::sync::atomic::AtomicU32,
desc_chain: SoundDescriptorChain,
descriptor: virtio_queue::Descriptor,
response_descriptor: virtio_queue::Descriptor,
vring: VringRwLock,
}
impl Drop for IOMessage {
fn drop(&mut self) {
log::trace!("dropping IOMessage");
let resp = VirtioSoundPcmStatus {
status: self.status.load(std::sync::atomic::Ordering::SeqCst).into(),
latency_bytes: self
@ -264,24 +263,24 @@ impl Drop for IOMessage {
.load(std::sync::atomic::Ordering::SeqCst)
.into(),
};
log::trace!("dropping IOMessage {:?}", resp);
if let Err(err) = self
.desc_chain
.memory()
.write_obj(resp, self.descriptor.addr())
.write_obj(resp, self.response_descriptor.addr())
{
log::error!("Error::DescriptorWriteFailed: {}", err);
return;
}
if self
if let Err(err) = self
.vring
.add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32)
.is_err()
{
log::error!("Couldn't add used");
log::error!("Couldn't add used bytes count to vring: {}", err);
}
if self.vring.signal_used_queue().is_err() {
log::error!("Couldn't signal used queue");
if let Err(err) = self.vring.signal_used_queue() {
log::error!("Couldn't signal used queue: {}", err);
}
}
}
@ -289,7 +288,7 @@ impl Drop for IOMessage {
/// This is the public API through which an external program starts the
/// vhost-device-sound backend server.
pub fn start_backend_server(config: SoundConfig) {
log::trace!("Using config {:?}", &config);
log::trace!("Using config {:?}.", &config);
let listener = Listener::new(config.get_socket_path(), true).unwrap();
let backend = Arc::new(VhostUserSoundBackend::new(config).unwrap());
@ -300,12 +299,12 @@ pub fn start_backend_server(config: SoundConfig) {
)
.unwrap();
log::trace!("Starting daemon");
log::trace!("Starting daemon.");
daemon.start(listener).unwrap();
match daemon.wait() {
Ok(()) => {
info!("Stopping cleanly");
info!("Stopping cleanly.");
}
Err(vhost_user_backend::Error::HandleRequest(vhost_user::Error::PartialMessage)) => {
info!(