From 5984715806815a8835ff63b98f14bc5404fa9009 Mon Sep 17 00:00:00 2001 From: Dorinda Bassey Date: Thu, 14 Sep 2023 14:52:09 +0200 Subject: [PATCH] audio_backends/pipewire.rs: use safe queue API use safe queue API Signed-off-by: Dorinda Bassey --- crates/sound/src/audio_backends/pipewire.rs | 96 +++++++++------------ 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/crates/sound/src/audio_backends/pipewire.rs b/crates/sound/src/audio_backends/pipewire.rs index 247eaf1..355dfa6 100644 --- a/crates/sound/src/audio_backends/pipewire.rs +++ b/crates/sound/src/audio_backends/pipewire.rs @@ -24,7 +24,6 @@ use crate::virtio_sound::{ }; use crate::{Error, Stream}; use std::{ - cmp, collections::HashMap, convert::TryInto, mem::size_of, @@ -52,9 +51,9 @@ use spa::sys::{ }; use pw::sys::{ - pw_buffer, pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock, - pw_thread_loop_new, pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock, - pw_thread_loop_wait, PW_ID_CORE, + pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock, pw_thread_loop_new, + pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock, pw_thread_loop_wait, + PW_ID_CORE, }; use pw::{properties, spa, Context, Core, LoopRef}; @@ -139,7 +138,7 @@ unsafe impl Sync for PwBackend {} pub struct PwBackend { pub stream_params: Arc>>, - thread_loop: Arc, + thread_loop: Rc, pub core: Core, #[allow(dead_code)] context: Context, @@ -151,7 +150,7 @@ impl PwBackend { pub fn new(stream_params: Arc>>) -> Self { pw::init(); - let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap()); + let thread_loop = Rc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap()); let get_loop = thread_loop.get_loop(); thread_loop.lock(); @@ -386,64 +385,55 @@ impl AudioBackend for PwBackend { .update_params(&mut param) .expect("could not update params"); }) - .process(move |stream, _data| { - //todo: use safe dequeue_buffer(), contribute queue_buffer() - unsafe { - let b: *mut pw_buffer = stream.dequeue_raw_buffer(); - if b.is_null() { - return; - } - let buf = (*b).buffer; - let datas = (*buf).datas; - - let p = (*datas).data as *mut u8; - - if p.is_null() { - return; - } - - // to calculate as sizeof(int16_t) * NR_CHANNELS + .process(move |stream, _data| match stream.dequeue_buffer() { + None => debug!("No buffer recieved"), + Some(mut buf) => { + let datas = buf.datas_mut(); let frame_size = info.channels as u32 * size_of::() as u32; - let req = (*b).requested * (frame_size as u64); - let mut n_bytes = cmp::min(req as u32, (*datas).maxsize); + let data = &mut datas[0]; + let n_bytes = if let Some(slice) = data.data() { + let mut n_bytes = slice.len(); + let mut streams = streams.write().unwrap(); + let streams = streams + .get_mut(stream_id as usize) + .expect("Stream does not exist"); + let Some(buffer) = streams.buffers.front_mut() else { + return; + }; - let mut streams = streams.write().unwrap(); - let streams = streams - .get_mut(stream_id as usize) - .expect("Stream does not exist"); - let Some(buffer) = streams.buffers.front_mut() else { - return; - }; + let mut start = buffer.pos; - let mut start = buffer.pos; + let avail = (buffer.bytes.len() - start) as i32; - let avail = (buffer.bytes.len() - start) as i32; - - if avail <= 0 { - // pad with silence - ptr::write_bytes(p, 0, n_bytes as usize); - } else { if avail < n_bytes as i32 { n_bytes = avail.try_into().unwrap(); } + let p = &mut slice[buffer.pos..start + n_bytes]; + if avail <= 0 { + // pad with silence + unsafe { + ptr::write_bytes(p.as_mut_ptr(), 0, n_bytes); + } + } else { + let slice = &buffer.bytes[buffer.pos..start + n_bytes]; + p.copy_from_slice(slice); - let slice = &buffer.bytes[buffer.pos..]; - p.copy_from(slice.as_ptr(), slice.len()); + start += n_bytes; - start += n_bytes as usize; + buffer.pos = start; - buffer.pos = start; - - if start >= buffer.bytes.len() { - streams.buffers.pop_front(); + if start >= buffer.bytes.len() { + streams.buffers.pop_front(); + } } - } - - (*(*datas).chunk).offset = 0; - (*(*datas).chunk).stride = frame_size as i32; - (*(*datas).chunk).size = n_bytes; - - stream.queue_raw_buffer(b); + n_bytes + } else { + 0 + }; + let chunk = data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = frame_size as _; + *chunk.size_mut() = n_bytes as _; } }) .register()