pipewire.rs: Remove unsafe threadloop bindings

contributed these bindings upstream, use the bindings in pipewire rust

Signed-off-by: Dorinda Bassey <dbassey@redhat.com>
This commit is contained in:
Dorinda Bassey 2023-09-26 10:39:46 +02:00
parent 29a02a6899
commit a80d6ccf22
3 changed files with 21 additions and 105 deletions

8
Cargo.lock generated
View File

@ -589,7 +589,7 @@ dependencies = [
[[package]]
name = "libspa"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=5fe090b3ac8f6fed756c4871ac18f26edda3ac89#5fe090b3ac8f6fed756c4871ac18f26edda3ac89"
dependencies = [
"bitflags 2.3.3",
"cc",
@ -605,7 +605,7 @@ dependencies = [
[[package]]
name = "libspa-sys"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=5fe090b3ac8f6fed756c4871ac18f26edda3ac89#5fe090b3ac8f6fed756c4871ac18f26edda3ac89"
dependencies = [
"bindgen 0.66.1",
"cc",
@ -756,7 +756,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipewire"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=5fe090b3ac8f6fed756c4871ac18f26edda3ac89#5fe090b3ac8f6fed756c4871ac18f26edda3ac89"
dependencies = [
"anyhow",
"bitflags 2.3.3",
@ -772,7 +772,7 @@ dependencies = [
[[package]]
name = "pipewire-sys"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=5fe090b3ac8f6fed756c4871ac18f26edda3ac89#5fe090b3ac8f6fed756c4871ac18f26edda3ac89"
dependencies = [
"bindgen 0.66.1",
"libspa-sys",

View File

@ -19,7 +19,7 @@ alsa = { version = "0.7", optional = true }
clap = { version = "4.1", features = ["derive"] }
env_logger = "0.10"
log = "0.4"
pw = { package = "pipewire", git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs.git", rev = "068f16e4bcc2a58657ceb53bd134acb5b00a5391", optional = true }
pw = { package = "pipewire", git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs.git", rev = "5fe090b3ac8f6fed756c4871ac18f26edda3ac89", optional = true }
thiserror = "1.0"
vhost = { version = "0.6", features = ["vhost-user-slave"] }
vhost-user-backend = "0.8"

View File

@ -5,23 +5,12 @@ use std::{
collections::HashMap,
convert::TryInto,
mem::size_of,
ops::Deref,
ptr,
ptr::NonNull,
rc::Rc,
sync::{Arc, RwLock},
};
use log::debug;
use pw::{
properties, spa,
sys::{
pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock, pw_thread_loop_new,
pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock, pw_thread_loop_wait,
PW_ID_CORE,
},
Context, Core, LoopRef,
};
use pw::{properties, spa, sys::PW_ID_CORE, Context, Core, ThreadLoop};
use spa::{
param::{
audio::{AudioFormat, AudioInfoRaw},
@ -65,78 +54,6 @@ use crate::{
Error, Result, Stream,
};
struct PwThreadLoop(NonNull<pw_thread_loop>);
impl PwThreadLoop {
pub fn new(name: Option<&str>) -> Option<Self> {
let inner = unsafe {
pw_thread_loop_new(
name.map_or(ptr::null(), |p| p.as_ptr() as *const _),
std::ptr::null_mut(),
)
};
if inner.is_null() {
None
} else {
Some(Self(
NonNull::new(inner).expect("pw_thread_loop can't be null"),
))
}
}
pub fn get_loop(&self) -> PwInnerLoop {
let inner = unsafe { pw_thread_loop_get_loop(self.0.as_ptr()) };
PwInnerLoop {
inner: Rc::new(NonNull::new(inner).unwrap()),
}
}
pub fn unlock(&self) {
unsafe { pw_thread_loop_unlock(self.0.as_ptr()) }
}
pub fn lock(&self) {
unsafe { pw_thread_loop_lock(self.0.as_ptr()) }
}
pub fn start(&self) {
unsafe {
pw_thread_loop_start(self.0.as_ptr());
}
}
pub fn signal(&self) {
unsafe {
pw_thread_loop_signal(self.0.as_ptr(), false);
}
}
pub fn wait(&self) {
unsafe {
pw_thread_loop_wait(self.0.as_ptr());
}
}
}
#[derive(Debug, Clone)]
struct PwInnerLoop {
inner: Rc<NonNull<pw_loop>>,
}
impl AsRef<LoopRef> for PwInnerLoop {
fn as_ref(&self) -> &LoopRef {
self.deref()
}
}
impl Deref for PwInnerLoop {
type Target = LoopRef;
fn deref(&self) -> &Self::Target {
unsafe { &*(self.inner.as_ptr() as *mut LoopRef) }
}
}
// SAFETY: Safe as the structure can be sent to another thread.
unsafe impl Send for PwBackend {}
@ -146,10 +63,10 @@ unsafe impl Sync for PwBackend {}
pub struct PwBackend {
pub stream_params: Arc<RwLock<Vec<Stream>>>,
thread_loop: Rc<PwThreadLoop>,
thread_loop: ThreadLoop,
pub core: Core,
#[allow(dead_code)]
context: Context<PwInnerLoop>,
context: Context,
pub stream_hash: RwLock<HashMap<u32, pw::stream::Stream>>,
pub stream_listener: RwLock<HashMap<u32, pw::stream::StreamListener<i32>>>,
}
@ -158,12 +75,11 @@ impl PwBackend {
pub fn new(stream_params: Arc<RwLock<Vec<Stream>>>) -> Self {
pw::init();
let thread_loop = Rc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap());
let get_loop = thread_loop.get_loop();
let thread_loop = unsafe { ThreadLoop::new(Some("Pipewire thread loop")).unwrap() };
thread_loop.lock();
let lock_guard = thread_loop.lock();
let context = pw::Context::new(&get_loop).expect("failed to create context");
let context = pw::Context::new(&thread_loop).expect("failed to create context");
thread_loop.start();
let core = context.connect(None).expect("Failed to connect to core");
@ -179,13 +95,13 @@ impl PwBackend {
.add_listener_local()
.done(move |id, seq| {
if id == PW_ID_CORE && seq == pending {
thread_clone.signal();
thread_clone.signal(false);
}
})
.register();
thread_loop.wait();
thread_loop.unlock();
lock_guard.unlock();
log::trace!("pipewire backend running");
@ -253,7 +169,7 @@ impl AudioBackend for PwBackend {
} else {
let mut stream_hash = self.stream_hash.write().unwrap();
let mut stream_listener = self.stream_listener.write().unwrap();
self.thread_loop.lock();
let lock_guard = self.thread_loop.lock();
let stream_params = self.stream_params.read().unwrap();
let params = &stream_params[stream_id as usize].params;
@ -469,7 +385,7 @@ impl AudioBackend for PwBackend {
)
.expect("could not connect to the stream");
self.thread_loop.unlock();
lock_guard.unlock();
// insert created stream in a hash table
stream_hash.insert(stream_id, stream);
@ -487,7 +403,7 @@ impl AudioBackend for PwBackend {
log::error!("Stream {} release {}", stream_id, err);
msg.code = VIRTIO_SND_S_BAD_MSG;
} else {
self.thread_loop.lock();
let lock_guard = self.thread_loop.lock();
let mut stream_hash = self.stream_hash.write().unwrap();
let mut stream_listener = self.stream_listener.write().unwrap();
let st_buffer = &mut self.stream_params.write().unwrap();
@ -500,7 +416,7 @@ impl AudioBackend for PwBackend {
stream_hash.remove(&stream_id);
stream_listener.remove(&stream_id);
self.thread_loop.unlock();
lock_guard.unlock();
}
Ok(())
@ -515,13 +431,13 @@ impl AudioBackend for PwBackend {
// log the error and continue
log::error!("Stream {} start {}", stream_id, err);
} else {
self.thread_loop.lock();
let lock_guard = self.thread_loop.lock();
let stream_hash = self.stream_hash.read().unwrap();
let Some(stream) = stream_hash.get(&stream_id) else {
return Err(Error::StreamWithIdNotFound(stream_id));
};
stream.set_active(true).expect("could not start stream");
self.thread_loop.unlock();
lock_guard.unlock();
}
Ok(())
}
@ -534,13 +450,13 @@ impl AudioBackend for PwBackend {
if let Err(err) = stop_result {
log::error!("Stream {} stop {}", stream_id, err);
} else {
self.thread_loop.lock();
let lock_guard = self.thread_loop.lock();
let stream_hash = self.stream_hash.read().unwrap();
let Some(stream) = stream_hash.get(&stream_id) else {
return Err(Error::StreamWithIdNotFound(stream_id));
};
stream.set_active(false).expect("could not stop stream");
self.thread_loop.unlock();
lock_guard.unlock();
}
Ok(())