Merge pull request #31 from dorindabassey/safeq

pipewire.rs: use safe queue API
This commit is contained in:
dorindabassey 2023-09-18 12:40:24 +02:00 committed by GitHub
commit 35fa83aa85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,7 +24,6 @@ use crate::virtio_sound::{
};
use crate::{Error, Stream};
use std::{
cmp,
collections::HashMap,
convert::TryInto,
mem::size_of,
@ -52,9 +51,9 @@ use spa::sys::{
};
use pw::sys::{
pw_buffer, pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock,
pw_thread_loop_new, pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock,
pw_thread_loop_wait, PW_ID_CORE,
pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock, pw_thread_loop_new,
pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock, pw_thread_loop_wait,
PW_ID_CORE,
};
use pw::{properties, spa, Context, Core, LoopRef};
@ -139,7 +138,7 @@ unsafe impl Sync for PwBackend {}
pub struct PwBackend {
pub stream_params: Arc<RwLock<Vec<Stream>>>,
thread_loop: Arc<PwThreadLoop>,
thread_loop: Rc<PwThreadLoop>,
pub core: Core,
#[allow(dead_code)]
context: Context<PwInnerLoop>,
@ -151,7 +150,7 @@ impl PwBackend {
pub fn new(stream_params: Arc<RwLock<Vec<Stream>>>) -> Self {
pw::init();
let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap());
let thread_loop = Rc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap());
let get_loop = thread_loop.get_loop();
thread_loop.lock();
@ -386,64 +385,55 @@ impl AudioBackend for PwBackend {
.update_params(&mut param)
.expect("could not update params");
})
.process(move |stream, _data| {
//todo: use safe dequeue_buffer(), contribute queue_buffer()
unsafe {
let b: *mut pw_buffer = stream.dequeue_raw_buffer();
if b.is_null() {
return;
}
let buf = (*b).buffer;
let datas = (*buf).datas;
let p = (*datas).data as *mut u8;
if p.is_null() {
return;
}
// to calculate as sizeof(int16_t) * NR_CHANNELS
.process(move |stream, _data| match stream.dequeue_buffer() {
None => debug!("No buffer recieved"),
Some(mut buf) => {
let datas = buf.datas_mut();
let frame_size = info.channels as u32 * size_of::<i16>() as u32;
let req = (*b).requested * (frame_size as u64);
let mut n_bytes = cmp::min(req as u32, (*datas).maxsize);
let data = &mut datas[0];
let n_bytes = if let Some(slice) = data.data() {
let mut n_bytes = slice.len();
let mut streams = streams.write().unwrap();
let streams = streams
.get_mut(stream_id as usize)
.expect("Stream does not exist");
let Some(buffer) = streams.buffers.front_mut() else {
return;
};
let mut streams = streams.write().unwrap();
let streams = streams
.get_mut(stream_id as usize)
.expect("Stream does not exist");
let Some(buffer) = streams.buffers.front_mut() else {
return;
};
let mut start = buffer.pos;
let mut start = buffer.pos;
let avail = (buffer.bytes.len() - start) as i32;
let avail = (buffer.bytes.len() - start) as i32;
if avail <= 0 {
// pad with silence
ptr::write_bytes(p, 0, n_bytes as usize);
} else {
if avail < n_bytes as i32 {
n_bytes = avail.try_into().unwrap();
}
let p = &mut slice[buffer.pos..start + n_bytes];
if avail <= 0 {
// pad with silence
unsafe {
ptr::write_bytes(p.as_mut_ptr(), 0, n_bytes);
}
} else {
let slice = &buffer.bytes[buffer.pos..start + n_bytes];
p.copy_from_slice(slice);
let slice = &buffer.bytes[buffer.pos..];
p.copy_from(slice.as_ptr(), slice.len());
start += n_bytes;
start += n_bytes as usize;
buffer.pos = start;
buffer.pos = start;
if start >= buffer.bytes.len() {
streams.buffers.pop_front();
if start >= buffer.bytes.len() {
streams.buffers.pop_front();
}
}
}
(*(*datas).chunk).offset = 0;
(*(*datas).chunk).stride = frame_size as i32;
(*(*datas).chunk).size = n_bytes;
stream.queue_raw_buffer(b);
n_bytes
} else {
0
};
let chunk = data.chunk_mut();
*chunk.offset_mut() = 0;
*chunk.stride_mut() = frame_size as _;
*chunk.size_mut() = n_bytes as _;
}
})
.register()