crates/sound: Rebase pipewire backend

Rebase pipewire Backend

Co-authored-by: Matias Ezequiel Vara Larsen <mvaralar@redhat.com>
Co-authored-by: Dorinda Bassey <dbassey@redhat.com>

Signed-off-by: Dorinda Bassey <dbassey@redhat.com>
This commit is contained in:
Dorinda Bassey 2023-09-11 16:47:08 +02:00
parent 15516ccea4
commit 917d24c7fa
5 changed files with 573 additions and 222 deletions

73
Cargo.lock generated
View File

@ -124,24 +124,22 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.64.0"
version = "0.66.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.3.3",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"log",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 1.0.109",
"which",
"syn 2.0.31",
]
[[package]]
@ -236,7 +234,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.31",
]
[[package]]
@ -251,6 +249,15 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "convert_case"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "cookie-factory"
version = "0.3.2"
@ -397,7 +404,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.31",
]
[[package]]
@ -581,27 +588,26 @@ dependencies = [
[[package]]
name = "libspa"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "667dfbb50c3d1f7ee1d33afdc04d1255923ece7642db3303046e7d63d997d77d"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.3.3",
"cc",
"convert_case",
"cookie-factory",
"errno 0.3.1",
"libc",
"libspa-sys",
"nix 0.26.2",
"nom",
"system-deps",
]
[[package]]
name = "libspa-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79cf5b88f52534df7ca88d451ae9628e22124e3cc5c60966465a7db479534c7a"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
dependencies = [
"bindgen 0.64.0",
"bindgen 0.66.1",
"cc",
"system-deps",
]
@ -749,13 +755,11 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipewire"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc2180a4a84b855be86e6cd72fa6fd4318278871d2b1082e7cd05fe64b135ccb"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"errno 0.3.1",
"bitflags 2.3.3",
"libc",
"libspa",
"libspa-sys",
@ -767,11 +771,10 @@ dependencies = [
[[package]]
name = "pipewire-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a95290eedb7fb6aa3922fdc0261cd0ddeb940abcdbdef28778928106554d2123"
version = "0.7.2"
source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391"
dependencies = [
"bindgen 0.64.0",
"bindgen 0.66.1",
"libspa-sys",
"system-deps",
]
@ -905,7 +908,7 @@ dependencies = [
"regex",
"relative-path",
"rustc_version",
"syn 2.0.26",
"syn 2.0.31",
"unicode-ident",
]
@ -1049,9 +1052,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.26"
version = "2.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398"
dependencies = [
"proc-macro2",
"quote",
@ -1117,7 +1120,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.31",
]
[[package]]
@ -1160,6 +1163,12 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
[[package]]
name = "unicode-segmentation"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36"
[[package]]
name = "utf8parse"
version = "0.2.1"
@ -1259,14 +1268,10 @@ name = "vhost-user-sound"
version = "0.1.0"
dependencies = [
"alsa",
"bindgen 0.64.0",
"clap",
"env_logger",
"libspa",
"libspa-sys",
"log",
"pipewire",
"pipewire-sys",
"rstest",
"serial_test",
"thiserror",

View File

@ -13,18 +13,14 @@ edition = "2018"
default = ["null-backend", "alsa-backend", "pw-backend"]
null-backend = []
alsa-backend = ["dep:alsa"]
pw-backend = ["pipewire", "libspa", "pipewire-sys", "libspa-sys", "bindgen"]
pw-backend = ["pw"]
[dependencies]
alsa = { version = "0.7", optional = true }
bindgen = { version = "0.64.0", optional = true }
clap = { version = "4.1", features = ["derive"] }
env_logger = "0.10"
libspa = { version = "0.6.0", optional = true }
libspa-sys = { version = "0.6.0", optional = true }
log = "0.4"
pipewire = { version = "0.6.0", optional = true }
pipewire-sys = { version = "0.6.0", optional = true }
pw = { package = "pipewire", git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs.git", rev = "068f16e4bcc2a58657ceb53bd134acb5b00a5391", optional = true }
thiserror = "1.0"
vhost = { version = "0.6", features = ["vhost-user-slave"] }
vhost-user-backend = "0.8"

View File

@ -1,30 +1,548 @@
// Pipewire backend device
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::sync::{Arc, RwLock};
use crate::device::ControlMessage;
use crate::Result;
use virtio_queue::Descriptor;
use vm_memory::Bytes;
use super::AudioBackend;
use crate::{Result, Stream};
use crate::virtio_sound::{
VirtioSndPcmSetParams, VIRTIO_SND_D_INPUT, VIRTIO_SND_D_OUTPUT, VIRTIO_SND_PCM_FMT_A_LAW,
VIRTIO_SND_PCM_FMT_FLOAT, VIRTIO_SND_PCM_FMT_FLOAT64, VIRTIO_SND_PCM_FMT_MU_LAW,
VIRTIO_SND_PCM_FMT_S16, VIRTIO_SND_PCM_FMT_S18_3, VIRTIO_SND_PCM_FMT_S20,
VIRTIO_SND_PCM_FMT_S20_3, VIRTIO_SND_PCM_FMT_S24, VIRTIO_SND_PCM_FMT_S24_3,
VIRTIO_SND_PCM_FMT_S32, VIRTIO_SND_PCM_FMT_S8, VIRTIO_SND_PCM_FMT_U16,
VIRTIO_SND_PCM_FMT_U18_3, VIRTIO_SND_PCM_FMT_U20, VIRTIO_SND_PCM_FMT_U20_3,
VIRTIO_SND_PCM_FMT_U24, VIRTIO_SND_PCM_FMT_U24_3, VIRTIO_SND_PCM_FMT_U32,
VIRTIO_SND_PCM_FMT_U8, VIRTIO_SND_PCM_RATE_11025, VIRTIO_SND_PCM_RATE_16000,
VIRTIO_SND_PCM_RATE_176400, VIRTIO_SND_PCM_RATE_192000, VIRTIO_SND_PCM_RATE_22050,
VIRTIO_SND_PCM_RATE_32000, VIRTIO_SND_PCM_RATE_384000, VIRTIO_SND_PCM_RATE_44100,
VIRTIO_SND_PCM_RATE_48000, VIRTIO_SND_PCM_RATE_5512, VIRTIO_SND_PCM_RATE_64000,
VIRTIO_SND_PCM_RATE_8000, VIRTIO_SND_PCM_RATE_88200, VIRTIO_SND_PCM_RATE_96000,
VIRTIO_SND_S_BAD_MSG, VIRTIO_SND_S_NOT_SUPP,
};
use crate::{Error, Stream};
use std::{
cmp,
collections::HashMap,
convert::TryInto,
mem::size_of,
ops::Deref,
ptr,
ptr::NonNull,
rc::Rc,
sync::{Arc, RwLock},
};
use log::debug;
use spa::param::{audio::AudioFormat, audio::AudioInfoRaw, ParamType};
use spa::pod::{serialize::PodSerializer, Object, Pod, Value};
use spa::sys::{
spa_audio_info_raw, SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format, SPA_AUDIO_CHANNEL_FC,
SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR, SPA_AUDIO_CHANNEL_LFE, SPA_AUDIO_CHANNEL_MONO,
SPA_AUDIO_CHANNEL_RC, SPA_AUDIO_CHANNEL_RL, SPA_AUDIO_CHANNEL_RR, SPA_AUDIO_CHANNEL_UNKNOWN,
SPA_AUDIO_FORMAT_ALAW, SPA_AUDIO_FORMAT_F32, SPA_AUDIO_FORMAT_F64, SPA_AUDIO_FORMAT_S16,
SPA_AUDIO_FORMAT_S18_LE, SPA_AUDIO_FORMAT_S20, SPA_AUDIO_FORMAT_S20_LE, SPA_AUDIO_FORMAT_S24,
SPA_AUDIO_FORMAT_S24_LE, SPA_AUDIO_FORMAT_S32, SPA_AUDIO_FORMAT_S8, SPA_AUDIO_FORMAT_U16,
SPA_AUDIO_FORMAT_U18_LE, SPA_AUDIO_FORMAT_U20, SPA_AUDIO_FORMAT_U20_LE, SPA_AUDIO_FORMAT_U24,
SPA_AUDIO_FORMAT_U24_LE, SPA_AUDIO_FORMAT_U32, SPA_AUDIO_FORMAT_U8, SPA_AUDIO_FORMAT_ULAW,
SPA_AUDIO_FORMAT_UNKNOWN,
};
use pw::sys::{
pw_buffer, pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock,
pw_thread_loop_new, pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock,
pw_thread_loop_wait, PW_ID_CORE,
};
use pw::{properties, spa, Context, Core, LoopRef};
struct PwThreadLoop(NonNull<pw_thread_loop>);
impl PwThreadLoop {
pub fn new(name: Option<&str>) -> Option<Self> {
let inner = unsafe {
pw_thread_loop_new(
name.map_or(ptr::null(), |p| p.as_ptr() as *const _),
std::ptr::null_mut(),
)
};
if inner.is_null() {
None
} else {
Some(Self(
NonNull::new(inner).expect("pw_thread_loop can't be null"),
))
}
}
pub fn get_loop(&self) -> PwInnerLoop {
let inner = unsafe { pw_thread_loop_get_loop(self.0.as_ptr()) };
PwInnerLoop {
inner: Rc::new(NonNull::new(inner).unwrap()),
}
}
pub fn unlock(&self) {
unsafe { pw_thread_loop_unlock(self.0.as_ptr()) }
}
pub fn lock(&self) {
unsafe { pw_thread_loop_lock(self.0.as_ptr()) }
}
pub fn start(&self) {
unsafe {
pw_thread_loop_start(self.0.as_ptr());
}
}
pub fn signal(&self) {
unsafe {
pw_thread_loop_signal(self.0.as_ptr(), false);
}
}
pub fn wait(&self) {
unsafe {
pw_thread_loop_wait(self.0.as_ptr());
}
}
}
#[derive(Debug, Clone)]
struct PwInnerLoop {
inner: Rc<NonNull<pw_loop>>,
}
impl AsRef<LoopRef> for PwInnerLoop {
fn as_ref(&self) -> &LoopRef {
self.deref()
}
}
impl Deref for PwInnerLoop {
type Target = LoopRef;
fn deref(&self) -> &Self::Target {
unsafe { &*(self.inner.as_ptr() as *mut LoopRef) }
}
}
// SAFETY: Safe as the structure can be sent to another thread.
unsafe impl Send for PwBackend {}
// SAFETY: Safe as the structure can be shared with another thread as the state
// is protected with a lock.
unsafe impl Sync for PwBackend {}
pub struct PwBackend {
streams: Arc<RwLock<Vec<Stream>>>,
pub stream_params: Arc<RwLock<Vec<Stream>>>,
thread_loop: Arc<PwThreadLoop>,
pub core: Core,
#[allow(dead_code)]
context: Context<PwInnerLoop>,
pub stream_hash: RwLock<HashMap<u32, pw::stream::Stream>>,
pub stream_listener: RwLock<HashMap<u32, pw::stream::StreamListener<i32>>>,
}
impl PwBackend {
pub fn new(streams: Arc<RwLock<Vec<Stream>>>) -> Self {
Self { streams }
pub fn new(stream_params: Arc<RwLock<Vec<Stream>>>) -> Self {
pw::init();
let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap());
let get_loop = thread_loop.get_loop();
thread_loop.lock();
let context = pw::Context::new(&get_loop).expect("failed to create context");
thread_loop.start();
let core = context.connect(None).expect("Failed to connect to core");
// Create new reference for the variable so that it can be moved into the closure.
let thread_clone = thread_loop.clone();
// Trigger the sync event. The server's answer won't be processed until we start the thread loop,
// so we can safely do this before setting up a callback. This lets us avoid using a Cell.
let pending = core.sync(0).expect("sync failed");
let _listener_core = core
.add_listener_local()
.done(move |id, seq| {
if id == PW_ID_CORE && seq == pending {
thread_clone.signal();
}
})
.register();
thread_loop.wait();
thread_loop.unlock();
log::trace!("pipewire backend running");
Self {
stream_params,
thread_loop,
core,
context,
stream_hash: RwLock::new(HashMap::new()),
stream_listener: RwLock::new(HashMap::new()),
}
}
}
impl AudioBackend for PwBackend {
fn write(&self, stream_id: u32) -> Result<()> {
log::trace!("PipewireBackend write stream_id {}", stream_id);
_ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers);
fn write(&self, _stream_id: u32) -> Result<()> {
Ok(())
}
fn read(&self, _id: u32) -> Result<()> {
log::trace!("PipewireBackend read stream_id {}", _id);
fn read(&self, _stream_id: u32) -> Result<()> {
log::trace!("PipewireBackend read stream_id {}", _stream_id);
Ok(())
}
fn set_parameters(&self, stream_id: u32, mut msg: ControlMessage) -> Result<()> {
let descriptors: Vec<Descriptor> = msg.desc_chain.clone().collect();
let desc_request = &descriptors[0];
let request = msg
.desc_chain
.memory()
.read_obj::<VirtioSndPcmSetParams>(desc_request.addr())
.unwrap();
{
let stream_clone = self.stream_params.clone();
let mut stream_params = stream_clone.write().unwrap();
let st = stream_params
.get_mut(stream_id as usize)
.expect("Stream does not exist");
if let Err(err) = st.state.set_parameters() {
log::error!("Stream {} set_parameters {}", stream_id, err);
msg.code = VIRTIO_SND_S_BAD_MSG;
} else if !st.supports_format(request.format) || !st.supports_rate(request.rate) {
msg.code = VIRTIO_SND_S_NOT_SUPP;
} else {
st.params.features = request.features;
st.params.buffer_bytes = request.buffer_bytes;
st.params.period_bytes = request.period_bytes;
st.params.channels = request.channels;
st.params.format = request.format;
st.params.rate = request.rate;
}
}
drop(msg);
Ok(())
}
fn prepare(&self, stream_id: u32) -> Result<()> {
debug!("pipewire prepare");
let prepare_result = self.stream_params.write().unwrap()[stream_id as usize]
.state
.prepare();
if let Err(err) = prepare_result {
log::error!("Stream {} prepare {}", stream_id, err);
} else {
let mut stream_hash = self.stream_hash.write().unwrap();
let mut stream_listener = self.stream_listener.write().unwrap();
self.thread_loop.lock();
let stream_params = self.stream_params.read().unwrap();
let params = &stream_params[stream_id as usize].params;
let mut pos: [u32; 64] = [SPA_AUDIO_CHANNEL_UNKNOWN; 64];
match params.channels {
6 => {
pos[0] = SPA_AUDIO_CHANNEL_FL;
pos[1] = SPA_AUDIO_CHANNEL_FR;
pos[2] = SPA_AUDIO_CHANNEL_FC;
pos[3] = SPA_AUDIO_CHANNEL_LFE;
pos[4] = SPA_AUDIO_CHANNEL_RL;
pos[5] = SPA_AUDIO_CHANNEL_RR;
}
5 => {
pos[0] = SPA_AUDIO_CHANNEL_FL;
pos[1] = SPA_AUDIO_CHANNEL_FR;
pos[2] = SPA_AUDIO_CHANNEL_FC;
pos[3] = SPA_AUDIO_CHANNEL_LFE;
pos[4] = SPA_AUDIO_CHANNEL_RC;
}
4 => {
pos[0] = SPA_AUDIO_CHANNEL_FL;
pos[1] = SPA_AUDIO_CHANNEL_FR;
pos[2] = SPA_AUDIO_CHANNEL_FC;
pos[3] = SPA_AUDIO_CHANNEL_RC;
}
3 => {
pos[0] = SPA_AUDIO_CHANNEL_FL;
pos[1] = SPA_AUDIO_CHANNEL_FR;
pos[2] = SPA_AUDIO_CHANNEL_LFE;
}
2 => {
pos[0] = SPA_AUDIO_CHANNEL_FL;
pos[1] = SPA_AUDIO_CHANNEL_FR;
}
1 => {
pos[0] = SPA_AUDIO_CHANNEL_MONO;
}
_ => {
return Err(Error::ChannelNotSupported(params.channels));
}
}
let info = spa_audio_info_raw {
format: match params.format {
VIRTIO_SND_PCM_FMT_MU_LAW => SPA_AUDIO_FORMAT_ULAW,
VIRTIO_SND_PCM_FMT_A_LAW => SPA_AUDIO_FORMAT_ALAW,
VIRTIO_SND_PCM_FMT_S8 => SPA_AUDIO_FORMAT_S8,
VIRTIO_SND_PCM_FMT_U8 => SPA_AUDIO_FORMAT_U8,
VIRTIO_SND_PCM_FMT_S16 => SPA_AUDIO_FORMAT_S16,
VIRTIO_SND_PCM_FMT_U16 => SPA_AUDIO_FORMAT_U16,
VIRTIO_SND_PCM_FMT_S18_3 => SPA_AUDIO_FORMAT_S18_LE,
VIRTIO_SND_PCM_FMT_U18_3 => SPA_AUDIO_FORMAT_U18_LE,
VIRTIO_SND_PCM_FMT_S20_3 => SPA_AUDIO_FORMAT_S20_LE,
VIRTIO_SND_PCM_FMT_U20_3 => SPA_AUDIO_FORMAT_U20_LE,
VIRTIO_SND_PCM_FMT_S24_3 => SPA_AUDIO_FORMAT_S24_LE,
VIRTIO_SND_PCM_FMT_U24_3 => SPA_AUDIO_FORMAT_U24_LE,
VIRTIO_SND_PCM_FMT_S20 => SPA_AUDIO_FORMAT_S20,
VIRTIO_SND_PCM_FMT_U20 => SPA_AUDIO_FORMAT_U20,
VIRTIO_SND_PCM_FMT_S24 => SPA_AUDIO_FORMAT_S24,
VIRTIO_SND_PCM_FMT_U24 => SPA_AUDIO_FORMAT_U24,
VIRTIO_SND_PCM_FMT_S32 => SPA_AUDIO_FORMAT_S32,
VIRTIO_SND_PCM_FMT_U32 => SPA_AUDIO_FORMAT_U32,
VIRTIO_SND_PCM_FMT_FLOAT => SPA_AUDIO_FORMAT_F32,
VIRTIO_SND_PCM_FMT_FLOAT64 => SPA_AUDIO_FORMAT_F64,
_ => SPA_AUDIO_FORMAT_UNKNOWN,
},
rate: match params.rate {
VIRTIO_SND_PCM_RATE_5512 => 5512,
VIRTIO_SND_PCM_RATE_8000 => 8000,
VIRTIO_SND_PCM_RATE_11025 => 11025,
VIRTIO_SND_PCM_RATE_16000 => 16000,
VIRTIO_SND_PCM_RATE_22050 => 22050,
VIRTIO_SND_PCM_RATE_32000 => 32000,
VIRTIO_SND_PCM_RATE_44100 => 44100,
VIRTIO_SND_PCM_RATE_48000 => 48000,
VIRTIO_SND_PCM_RATE_64000 => 64000,
VIRTIO_SND_PCM_RATE_88200 => 88200,
VIRTIO_SND_PCM_RATE_96000 => 96000,
VIRTIO_SND_PCM_RATE_176400 => 176400,
VIRTIO_SND_PCM_RATE_192000 => 192000,
VIRTIO_SND_PCM_RATE_384000 => 384000,
_ => 44100,
},
flags: 0,
channels: params.channels as u32,
position: pos,
};
let mut audio_info = AudioInfoRaw::new();
audio_info.set_format(AudioFormat::S16LE);
audio_info.set_rate(info.rate);
audio_info.set_channels(info.channels);
let values: Vec<u8> = PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&Value::Object(Object {
type_: SPA_TYPE_OBJECT_Format,
id: SPA_PARAM_EnumFormat,
properties: audio_info.into(),
}),
)
.unwrap()
.0
.into_inner();
let value_clone = values.clone();
let mut param = [Pod::from_bytes(&values).unwrap()];
let props = properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Playback",
};
let stream = pw::stream::Stream::new(&self.core, "audio-output", props)
.expect("could not create new stream");
let streams = self.stream_params.clone();
let listener_stream = stream
.add_local_listener()
.state_changed(|old, new| {
debug!("State changed: {:?} -> {:?}", old, new);
})
.param_changed(move |stream, id, _data, param| {
let Some(_param) = param else {
return;
};
if id != ParamType::Format.as_raw() {
return;
}
let mut param = [Pod::from_bytes(&value_clone).unwrap()];
//callback to negotiate new set of streams
stream
.update_params(&mut param)
.expect("could not update params");
})
.process(move |stream, _data| {
//todo: use safe dequeue_buffer(), contribute queue_buffer()
unsafe {
let b: *mut pw_buffer = stream.dequeue_raw_buffer();
if b.is_null() {
return;
}
let buf = (*b).buffer;
let datas = (*buf).datas;
let p = (*datas).data as *mut u8;
if p.is_null() {
return;
}
// to calculate as sizeof(int16_t) * NR_CHANNELS
let frame_size = info.channels as u32 * size_of::<i16>() as u32;
let req = (*b).requested * (frame_size as u64);
let mut n_bytes = cmp::min(req as u32, (*datas).maxsize);
let mut streams = streams.write().unwrap();
let streams = streams
.get_mut(stream_id as usize)
.expect("Stream does not exist");
let Some(buffer) = streams.buffers.front_mut() else {
return;
};
let mut start = buffer.pos;
let avail = (buffer.bytes.len() - start) as i32;
if avail <= 0 {
// pad with silence
ptr::write_bytes(p, 0, n_bytes as usize);
} else {
if avail < n_bytes as i32 {
n_bytes = avail.try_into().unwrap();
}
let slice = &buffer.bytes[buffer.pos..];
p.copy_from(slice.as_ptr(), slice.len());
start += n_bytes as usize;
buffer.pos = start;
if start >= buffer.bytes.len() {
streams.buffers.pop_front();
}
}
(*(*datas).chunk).offset = 0;
(*(*datas).chunk).stride = frame_size as i32;
(*(*datas).chunk).size = n_bytes;
stream.queue_raw_buffer(b);
}
})
.register()
.expect("failed to register stream listener");
stream_listener.insert(stream_id, listener_stream);
let direction = match stream_params[stream_id as usize].direction {
VIRTIO_SND_D_OUTPUT => spa::Direction::Output,
VIRTIO_SND_D_INPUT => spa::Direction::Input,
_ => panic!("Invalid direction"),
};
stream
.connect(
direction,
Some(pw::constants::ID_ANY),
pw::stream::StreamFlags::RT_PROCESS
| pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::INACTIVE
| pw::stream::StreamFlags::MAP_BUFFERS,
&mut param,
)
.expect("could not connect to the stream");
self.thread_loop.unlock();
// insert created stream in a hash table
stream_hash.insert(stream_id, stream);
}
Ok(())
}
fn release(&self, stream_id: u32, mut msg: ControlMessage) -> Result<()> {
debug!("pipewire backend, release function");
let release_result = self.stream_params.write().unwrap()[stream_id as usize]
.state
.release();
if let Err(err) = release_result {
log::error!("Stream {} release {}", stream_id, err);
msg.code = VIRTIO_SND_S_BAD_MSG;
} else {
self.thread_loop.lock();
let mut stream_hash = self.stream_hash.write().unwrap();
let mut stream_listener = self.stream_listener.write().unwrap();
let st_buffer = &mut self.stream_params.write().unwrap();
let Some(stream) = stream_hash.get(&stream_id) else {
return Err(Error::StreamWithIdNotFound(stream_id));
};
stream.disconnect().expect("could not disconnect stream");
std::mem::take(&mut st_buffer[stream_id as usize].buffers);
stream_hash.remove(&stream_id);
stream_listener.remove(&stream_id);
self.thread_loop.unlock();
}
Ok(())
}
fn start(&self, stream_id: u32) -> Result<()> {
debug!("pipewire start");
let start_result = self.stream_params.write().unwrap()[stream_id as usize]
.state
.start();
if let Err(err) = start_result {
// log the error and continue
log::error!("Stream {} start {}", stream_id, err);
} else {
self.thread_loop.lock();
let stream_hash = self.stream_hash.read().unwrap();
let Some(stream) = stream_hash.get(&stream_id) else {
return Err(Error::StreamWithIdNotFound(stream_id));
};
stream.set_active(true).expect("could not start stream");
self.thread_loop.unlock();
}
Ok(())
}
fn stop(&self, stream_id: u32) -> Result<()> {
debug!("pipewire stop");
let stop_result = self.stream_params.write().unwrap()[stream_id as usize]
.state
.stop();
if let Err(err) = stop_result {
log::error!("Stream {} stop {}", stream_id, err);
} else {
self.thread_loop.lock();
let stream_hash = self.stream_hash.read().unwrap();
let Some(stream) = stream_hash.get(&stream_id) else {
return Err(Error::StreamWithIdNotFound(stream_id));
};
stream.set_active(false).expect("could not stop stream");
self.thread_loop.unlock();
}
Ok(())
}
}

View File

@ -1,172 +0,0 @@
// Pipewire backend device
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use super::AudioBackend;
use crate::vhu_sound::NR_STREAMS;
use crate::PCMParams;
use crate::Result;
use std::ops::Deref;
use std::ptr;
use std::ptr::NonNull;
use std::sync::Arc;
use std::sync::RwLock;
use pipewire as pw;
use pw::sys::{pw_loop, pw_thread_loop_new, pw_thread_loop_signal, PW_ID_CORE};
use pw::sys::{pw_thread_loop, pw_thread_loop_start, pw_thread_loop_wait};
use pw::sys::{pw_thread_loop_get_loop, pw_thread_loop_lock, pw_thread_loop_unlock};
use pw::Core;
use pw::LoopRef;
struct PwThreadLoop(NonNull<pw_thread_loop>);
impl PwThreadLoop {
pub fn new(name: Option<&str>) -> Option<Self> {
let inner = unsafe {
pw_thread_loop_new(
name.map_or(ptr::null(), |p| p.as_ptr() as *const _),
std::ptr::null_mut(),
)
};
if inner.is_null() {
None
} else {
Some(Self(
NonNull::new(inner).expect("pw_thread_loop can't be null"),
))
}
}
pub fn get_loop(&self) -> PwThreadLoopTheLoop {
let inner = unsafe { pw_thread_loop_get_loop(self.0.as_ptr()) };
PwThreadLoopTheLoop(NonNull::new(inner).unwrap())
}
pub fn unlock(&self) {
unsafe { pw_thread_loop_unlock(self.0.as_ptr()) }
}
pub fn lock(&self) {
unsafe { pw_thread_loop_lock(self.0.as_ptr()) }
}
pub fn start(&self) {
unsafe {
pw_thread_loop_start(self.0.as_ptr());
}
}
pub fn signal(&self) {
unsafe {
pw_thread_loop_signal(self.0.as_ptr(), false);
}
}
pub fn wait(&self) {
unsafe {
pw_thread_loop_wait(self.0.as_ptr());
}
}
}
#[derive(Debug, Clone)]
struct PwThreadLoopTheLoop(NonNull<pw_loop>);
impl AsRef<LoopRef> for PwThreadLoopTheLoop {
fn as_ref(&self) -> &LoopRef {
self.deref()
}
}
impl Deref for PwThreadLoopTheLoop {
type Target = LoopRef;
fn deref(&self) -> &Self::Target {
unsafe { &*(self.0.as_ptr() as *mut LoopRef) }
}
}
// SAFETY: Safe as the structure can be sent to another thread.
unsafe impl Send for PwBackend {}
// SAFETY: Safe as the structure can be shared with another thread as the state
// is protected with a lock.
unsafe impl Sync for PwBackend {}
pub struct PwBackend {
thread_loop: Arc<PwThreadLoop>,
pub core: Core,
pub stream_params: RwLock<Vec<PCMParams>>,
}
impl PwBackend {
pub fn new() -> Self {
pw::init();
let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap());
let get_loop = thread_loop.get_loop();
thread_loop.lock();
let context = pw::Context::new(&get_loop).expect("failed to create context");
thread_loop.start();
let core = context.connect(None).expect("Failed to connect to core");
// Create new reference for the variable so that it can be moved into the closure.
let thread_clone = thread_loop.clone();
// Trigger the sync event. The server's answer won't be processed until we start the thread loop,
// so we can safely do this before setting up a callback. This lets us avoid using a Cell.
let pending = core.sync(0).expect("sync failed");
let _listener_core = core
.add_listener_local()
.done(move |id, seq| {
if id == PW_ID_CORE && seq == pending {
thread_clone.signal();
}
})
.register();
thread_loop.wait();
thread_loop.unlock();
println!("pipewire backend running");
let streams_param = vec![PCMParams::default(); NR_STREAMS];
Self {
thread_loop,
core,
stream_params: RwLock::new(streams_param),
}
}
}
impl AudioBackend for PwBackend {
fn write(&self, stream_id: u32) -> Result<()> {
println!("pipewire backend, writting to stream: {}", stream_id);
Ok(())
}
fn read(&self, _stream_id: u32) -> Result<()> {
/*
let buf = req.data_slice().ok_or(Error::SoundReqMissingData)?;
let zero_mem = vec![0u8; buf.len()];
buf.copy_from(&zero_mem);
*/
Ok(())
}
fn set_param(&self, stream_id: u32, params: PCMParams) -> Result<()> {
let mut stream_params = self.stream_params.write().unwrap();
stream_params[stream_id as usize] = params;
Ok(())
}
fn prepare(&self, _stream_id: u32) -> Result<()> {
self.thread_loop.lock();
self.thread_loop.unlock();
Ok(())
}
}

View File

@ -77,6 +77,10 @@ pub enum Error {
UnexpectedDescriptorSize(usize, u32),
#[error("Protocol or device error: {0}")]
Stream(stream::Error),
#[error("Stream with id {0} not found")]
StreamWithIdNotFound(u32),
#[error("Channel number not supported: {0}")]
ChannelNotSupported(u8),
}
impl From<Error> for IoError {