diff --git a/Cargo.lock b/Cargo.lock index 5007d0c..f59e7ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,24 +124,22 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.64.0" +version = "0.66.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.3.3", "cexpr", "clang-sys", "lazy_static", "lazycell", - "log", "peeking_take_while", "proc-macro2", "quote", "regex", "rustc-hash", "shlex", - "syn 1.0.109", - "which", + "syn 2.0.31", ] [[package]] @@ -236,7 +234,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.31", ] [[package]] @@ -251,6 +249,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cookie-factory" version = "0.3.2" @@ -397,7 +404,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.31", ] [[package]] @@ -581,27 +588,26 @@ dependencies = [ [[package]] name = "libspa" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "667dfbb50c3d1f7ee1d33afdc04d1255923ece7642db3303046e7d63d997d77d" +version = "0.7.2" +source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.3.3", "cc", + "convert_case", "cookie-factory", - "errno 0.3.1", "libc", "libspa-sys", + "nix 0.26.2", "nom", "system-deps", ] [[package]] name = "libspa-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79cf5b88f52534df7ca88d451ae9628e22124e3cc5c60966465a7db479534c7a" +version = "0.7.2" +source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391" dependencies = [ - "bindgen 0.64.0", + "bindgen 0.66.1", "cc", "system-deps", ] @@ -749,13 +755,11 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipewire" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc2180a4a84b855be86e6cd72fa6fd4318278871d2b1082e7cd05fe64b135ccb" +version = "0.7.2" +source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391" dependencies = [ "anyhow", - "bitflags 1.3.2", - "errno 0.3.1", + "bitflags 2.3.3", "libc", "libspa", "libspa-sys", @@ -767,11 +771,10 @@ dependencies = [ [[package]] name = "pipewire-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a95290eedb7fb6aa3922fdc0261cd0ddeb940abcdbdef28778928106554d2123" +version = "0.7.2" +source = "git+https://gitlab.freedesktop.org/pipewire/pipewire-rs.git?rev=068f16e4bcc2a58657ceb53bd134acb5b00a5391#068f16e4bcc2a58657ceb53bd134acb5b00a5391" dependencies = [ - "bindgen 0.64.0", + "bindgen 0.66.1", "libspa-sys", "system-deps", ] @@ -905,7 +908,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.26", + "syn 2.0.31", "unicode-ident", ] @@ -1049,9 +1052,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.26" +version = "2.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" dependencies = [ "proc-macro2", "quote", @@ -1117,7 +1120,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.31", ] [[package]] @@ -1160,6 +1163,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "unicode-segmentation" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" + [[package]] name = "utf8parse" version = "0.2.1" @@ -1259,14 +1268,10 @@ name = "vhost-user-sound" version = "0.1.0" dependencies = [ "alsa", - "bindgen 0.64.0", "clap", "env_logger", - "libspa", - "libspa-sys", "log", "pipewire", - "pipewire-sys", "rstest", "serial_test", "thiserror", diff --git a/crates/sound/Cargo.toml b/crates/sound/Cargo.toml index 55a6a6d..5d010d3 100644 --- a/crates/sound/Cargo.toml +++ b/crates/sound/Cargo.toml @@ -13,18 +13,14 @@ edition = "2018" default = ["null-backend", "alsa-backend", "pw-backend"] null-backend = [] alsa-backend = ["dep:alsa"] -pw-backend = ["pipewire", "libspa", "pipewire-sys", "libspa-sys", "bindgen"] +pw-backend = ["pw"] [dependencies] alsa = { version = "0.7", optional = true } -bindgen = { version = "0.64.0", optional = true } clap = { version = "4.1", features = ["derive"] } env_logger = "0.10" -libspa = { version = "0.6.0", optional = true } -libspa-sys = { version = "0.6.0", optional = true } log = "0.4" -pipewire = { version = "0.6.0", optional = true } -pipewire-sys = { version = "0.6.0", optional = true } +pw = { package = "pipewire", git = "https://gitlab.freedesktop.org/pipewire/pipewire-rs.git", rev = "068f16e4bcc2a58657ceb53bd134acb5b00a5391", optional = true } thiserror = "1.0" vhost = { version = "0.6", features = ["vhost-user-slave"] } vhost-user-backend = "0.8" diff --git a/crates/sound/src/audio_backends/pipewire.rs b/crates/sound/src/audio_backends/pipewire.rs index c051c08..247eaf1 100644 --- a/crates/sound/src/audio_backends/pipewire.rs +++ b/crates/sound/src/audio_backends/pipewire.rs @@ -1,30 +1,548 @@ // Pipewire backend device // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause -use std::sync::{Arc, RwLock}; +use crate::device::ControlMessage; +use crate::Result; +use virtio_queue::Descriptor; +use vm_memory::Bytes; use super::AudioBackend; -use crate::{Result, Stream}; +use crate::virtio_sound::{ + VirtioSndPcmSetParams, VIRTIO_SND_D_INPUT, VIRTIO_SND_D_OUTPUT, VIRTIO_SND_PCM_FMT_A_LAW, + VIRTIO_SND_PCM_FMT_FLOAT, VIRTIO_SND_PCM_FMT_FLOAT64, VIRTIO_SND_PCM_FMT_MU_LAW, + VIRTIO_SND_PCM_FMT_S16, VIRTIO_SND_PCM_FMT_S18_3, VIRTIO_SND_PCM_FMT_S20, + VIRTIO_SND_PCM_FMT_S20_3, VIRTIO_SND_PCM_FMT_S24, VIRTIO_SND_PCM_FMT_S24_3, + VIRTIO_SND_PCM_FMT_S32, VIRTIO_SND_PCM_FMT_S8, VIRTIO_SND_PCM_FMT_U16, + VIRTIO_SND_PCM_FMT_U18_3, VIRTIO_SND_PCM_FMT_U20, VIRTIO_SND_PCM_FMT_U20_3, + VIRTIO_SND_PCM_FMT_U24, VIRTIO_SND_PCM_FMT_U24_3, VIRTIO_SND_PCM_FMT_U32, + VIRTIO_SND_PCM_FMT_U8, VIRTIO_SND_PCM_RATE_11025, VIRTIO_SND_PCM_RATE_16000, + VIRTIO_SND_PCM_RATE_176400, VIRTIO_SND_PCM_RATE_192000, VIRTIO_SND_PCM_RATE_22050, + VIRTIO_SND_PCM_RATE_32000, VIRTIO_SND_PCM_RATE_384000, VIRTIO_SND_PCM_RATE_44100, + VIRTIO_SND_PCM_RATE_48000, VIRTIO_SND_PCM_RATE_5512, VIRTIO_SND_PCM_RATE_64000, + VIRTIO_SND_PCM_RATE_8000, VIRTIO_SND_PCM_RATE_88200, VIRTIO_SND_PCM_RATE_96000, + VIRTIO_SND_S_BAD_MSG, VIRTIO_SND_S_NOT_SUPP, +}; +use crate::{Error, Stream}; +use std::{ + cmp, + collections::HashMap, + convert::TryInto, + mem::size_of, + ops::Deref, + ptr, + ptr::NonNull, + rc::Rc, + sync::{Arc, RwLock}, +}; + +use log::debug; +use spa::param::{audio::AudioFormat, audio::AudioInfoRaw, ParamType}; +use spa::pod::{serialize::PodSerializer, Object, Pod, Value}; + +use spa::sys::{ + spa_audio_info_raw, SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format, SPA_AUDIO_CHANNEL_FC, + SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR, SPA_AUDIO_CHANNEL_LFE, SPA_AUDIO_CHANNEL_MONO, + SPA_AUDIO_CHANNEL_RC, SPA_AUDIO_CHANNEL_RL, SPA_AUDIO_CHANNEL_RR, SPA_AUDIO_CHANNEL_UNKNOWN, + SPA_AUDIO_FORMAT_ALAW, SPA_AUDIO_FORMAT_F32, SPA_AUDIO_FORMAT_F64, SPA_AUDIO_FORMAT_S16, + SPA_AUDIO_FORMAT_S18_LE, SPA_AUDIO_FORMAT_S20, SPA_AUDIO_FORMAT_S20_LE, SPA_AUDIO_FORMAT_S24, + SPA_AUDIO_FORMAT_S24_LE, SPA_AUDIO_FORMAT_S32, SPA_AUDIO_FORMAT_S8, SPA_AUDIO_FORMAT_U16, + SPA_AUDIO_FORMAT_U18_LE, SPA_AUDIO_FORMAT_U20, SPA_AUDIO_FORMAT_U20_LE, SPA_AUDIO_FORMAT_U24, + SPA_AUDIO_FORMAT_U24_LE, SPA_AUDIO_FORMAT_U32, SPA_AUDIO_FORMAT_U8, SPA_AUDIO_FORMAT_ULAW, + SPA_AUDIO_FORMAT_UNKNOWN, +}; + +use pw::sys::{ + pw_buffer, pw_loop, pw_thread_loop, pw_thread_loop_get_loop, pw_thread_loop_lock, + pw_thread_loop_new, pw_thread_loop_signal, pw_thread_loop_start, pw_thread_loop_unlock, + pw_thread_loop_wait, PW_ID_CORE, +}; +use pw::{properties, spa, Context, Core, LoopRef}; + +struct PwThreadLoop(NonNull); + +impl PwThreadLoop { + pub fn new(name: Option<&str>) -> Option { + let inner = unsafe { + pw_thread_loop_new( + name.map_or(ptr::null(), |p| p.as_ptr() as *const _), + std::ptr::null_mut(), + ) + }; + if inner.is_null() { + None + } else { + Some(Self( + NonNull::new(inner).expect("pw_thread_loop can't be null"), + )) + } + } + + pub fn get_loop(&self) -> PwInnerLoop { + let inner = unsafe { pw_thread_loop_get_loop(self.0.as_ptr()) }; + PwInnerLoop { + inner: Rc::new(NonNull::new(inner).unwrap()), + } + } + + pub fn unlock(&self) { + unsafe { pw_thread_loop_unlock(self.0.as_ptr()) } + } + + pub fn lock(&self) { + unsafe { pw_thread_loop_lock(self.0.as_ptr()) } + } + + pub fn start(&self) { + unsafe { + pw_thread_loop_start(self.0.as_ptr()); + } + } + + pub fn signal(&self) { + unsafe { + pw_thread_loop_signal(self.0.as_ptr(), false); + } + } + + pub fn wait(&self) { + unsafe { + pw_thread_loop_wait(self.0.as_ptr()); + } + } +} + +#[derive(Debug, Clone)] +struct PwInnerLoop { + inner: Rc>, +} + +impl AsRef for PwInnerLoop { + fn as_ref(&self) -> &LoopRef { + self.deref() + } +} + +impl Deref for PwInnerLoop { + type Target = LoopRef; + + fn deref(&self) -> &Self::Target { + unsafe { &*(self.inner.as_ptr() as *mut LoopRef) } + } +} + +// SAFETY: Safe as the structure can be sent to another thread. +unsafe impl Send for PwBackend {} + +// SAFETY: Safe as the structure can be shared with another thread as the state +// is protected with a lock. +unsafe impl Sync for PwBackend {} pub struct PwBackend { - streams: Arc>>, + pub stream_params: Arc>>, + thread_loop: Arc, + pub core: Core, + #[allow(dead_code)] + context: Context, + pub stream_hash: RwLock>, + pub stream_listener: RwLock>>, } impl PwBackend { - pub fn new(streams: Arc>>) -> Self { - Self { streams } + pub fn new(stream_params: Arc>>) -> Self { + pw::init(); + + let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap()); + let get_loop = thread_loop.get_loop(); + + thread_loop.lock(); + + let context = pw::Context::new(&get_loop).expect("failed to create context"); + thread_loop.start(); + let core = context.connect(None).expect("Failed to connect to core"); + + // Create new reference for the variable so that it can be moved into the closure. + let thread_clone = thread_loop.clone(); + + // Trigger the sync event. The server's answer won't be processed until we start the thread loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let pending = core.sync(0).expect("sync failed"); + let _listener_core = core + .add_listener_local() + .done(move |id, seq| { + if id == PW_ID_CORE && seq == pending { + thread_clone.signal(); + } + }) + .register(); + + thread_loop.wait(); + thread_loop.unlock(); + + log::trace!("pipewire backend running"); + + Self { + stream_params, + thread_loop, + core, + context, + stream_hash: RwLock::new(HashMap::new()), + stream_listener: RwLock::new(HashMap::new()), + } } } impl AudioBackend for PwBackend { - fn write(&self, stream_id: u32) -> Result<()> { - log::trace!("PipewireBackend write stream_id {}", stream_id); - _ = std::mem::take(&mut self.streams.write().unwrap()[stream_id as usize].buffers); + fn write(&self, _stream_id: u32) -> Result<()> { Ok(()) } - fn read(&self, _id: u32) -> Result<()> { - log::trace!("PipewireBackend read stream_id {}", _id); + fn read(&self, _stream_id: u32) -> Result<()> { + log::trace!("PipewireBackend read stream_id {}", _stream_id); + Ok(()) + } + + fn set_parameters(&self, stream_id: u32, mut msg: ControlMessage) -> Result<()> { + let descriptors: Vec = msg.desc_chain.clone().collect(); + let desc_request = &descriptors[0]; + let request = msg + .desc_chain + .memory() + .read_obj::(desc_request.addr()) + .unwrap(); + { + let stream_clone = self.stream_params.clone(); + let mut stream_params = stream_clone.write().unwrap(); + let st = stream_params + .get_mut(stream_id as usize) + .expect("Stream does not exist"); + if let Err(err) = st.state.set_parameters() { + log::error!("Stream {} set_parameters {}", stream_id, err); + msg.code = VIRTIO_SND_S_BAD_MSG; + } else if !st.supports_format(request.format) || !st.supports_rate(request.rate) { + msg.code = VIRTIO_SND_S_NOT_SUPP; + } else { + st.params.features = request.features; + st.params.buffer_bytes = request.buffer_bytes; + st.params.period_bytes = request.period_bytes; + st.params.channels = request.channels; + st.params.format = request.format; + st.params.rate = request.rate; + } + } + drop(msg); + + Ok(()) + } + + fn prepare(&self, stream_id: u32) -> Result<()> { + debug!("pipewire prepare"); + let prepare_result = self.stream_params.write().unwrap()[stream_id as usize] + .state + .prepare(); + if let Err(err) = prepare_result { + log::error!("Stream {} prepare {}", stream_id, err); + } else { + let mut stream_hash = self.stream_hash.write().unwrap(); + let mut stream_listener = self.stream_listener.write().unwrap(); + self.thread_loop.lock(); + let stream_params = self.stream_params.read().unwrap(); + + let params = &stream_params[stream_id as usize].params; + + let mut pos: [u32; 64] = [SPA_AUDIO_CHANNEL_UNKNOWN; 64]; + + match params.channels { + 6 => { + pos[0] = SPA_AUDIO_CHANNEL_FL; + pos[1] = SPA_AUDIO_CHANNEL_FR; + pos[2] = SPA_AUDIO_CHANNEL_FC; + pos[3] = SPA_AUDIO_CHANNEL_LFE; + pos[4] = SPA_AUDIO_CHANNEL_RL; + pos[5] = SPA_AUDIO_CHANNEL_RR; + } + 5 => { + pos[0] = SPA_AUDIO_CHANNEL_FL; + pos[1] = SPA_AUDIO_CHANNEL_FR; + pos[2] = SPA_AUDIO_CHANNEL_FC; + pos[3] = SPA_AUDIO_CHANNEL_LFE; + pos[4] = SPA_AUDIO_CHANNEL_RC; + } + 4 => { + pos[0] = SPA_AUDIO_CHANNEL_FL; + pos[1] = SPA_AUDIO_CHANNEL_FR; + pos[2] = SPA_AUDIO_CHANNEL_FC; + pos[3] = SPA_AUDIO_CHANNEL_RC; + } + 3 => { + pos[0] = SPA_AUDIO_CHANNEL_FL; + pos[1] = SPA_AUDIO_CHANNEL_FR; + pos[2] = SPA_AUDIO_CHANNEL_LFE; + } + 2 => { + pos[0] = SPA_AUDIO_CHANNEL_FL; + pos[1] = SPA_AUDIO_CHANNEL_FR; + } + 1 => { + pos[0] = SPA_AUDIO_CHANNEL_MONO; + } + _ => { + return Err(Error::ChannelNotSupported(params.channels)); + } + } + + let info = spa_audio_info_raw { + format: match params.format { + VIRTIO_SND_PCM_FMT_MU_LAW => SPA_AUDIO_FORMAT_ULAW, + VIRTIO_SND_PCM_FMT_A_LAW => SPA_AUDIO_FORMAT_ALAW, + VIRTIO_SND_PCM_FMT_S8 => SPA_AUDIO_FORMAT_S8, + VIRTIO_SND_PCM_FMT_U8 => SPA_AUDIO_FORMAT_U8, + VIRTIO_SND_PCM_FMT_S16 => SPA_AUDIO_FORMAT_S16, + VIRTIO_SND_PCM_FMT_U16 => SPA_AUDIO_FORMAT_U16, + VIRTIO_SND_PCM_FMT_S18_3 => SPA_AUDIO_FORMAT_S18_LE, + VIRTIO_SND_PCM_FMT_U18_3 => SPA_AUDIO_FORMAT_U18_LE, + VIRTIO_SND_PCM_FMT_S20_3 => SPA_AUDIO_FORMAT_S20_LE, + VIRTIO_SND_PCM_FMT_U20_3 => SPA_AUDIO_FORMAT_U20_LE, + VIRTIO_SND_PCM_FMT_S24_3 => SPA_AUDIO_FORMAT_S24_LE, + VIRTIO_SND_PCM_FMT_U24_3 => SPA_AUDIO_FORMAT_U24_LE, + VIRTIO_SND_PCM_FMT_S20 => SPA_AUDIO_FORMAT_S20, + VIRTIO_SND_PCM_FMT_U20 => SPA_AUDIO_FORMAT_U20, + VIRTIO_SND_PCM_FMT_S24 => SPA_AUDIO_FORMAT_S24, + VIRTIO_SND_PCM_FMT_U24 => SPA_AUDIO_FORMAT_U24, + VIRTIO_SND_PCM_FMT_S32 => SPA_AUDIO_FORMAT_S32, + VIRTIO_SND_PCM_FMT_U32 => SPA_AUDIO_FORMAT_U32, + VIRTIO_SND_PCM_FMT_FLOAT => SPA_AUDIO_FORMAT_F32, + VIRTIO_SND_PCM_FMT_FLOAT64 => SPA_AUDIO_FORMAT_F64, + _ => SPA_AUDIO_FORMAT_UNKNOWN, + }, + rate: match params.rate { + VIRTIO_SND_PCM_RATE_5512 => 5512, + VIRTIO_SND_PCM_RATE_8000 => 8000, + VIRTIO_SND_PCM_RATE_11025 => 11025, + VIRTIO_SND_PCM_RATE_16000 => 16000, + VIRTIO_SND_PCM_RATE_22050 => 22050, + VIRTIO_SND_PCM_RATE_32000 => 32000, + VIRTIO_SND_PCM_RATE_44100 => 44100, + VIRTIO_SND_PCM_RATE_48000 => 48000, + VIRTIO_SND_PCM_RATE_64000 => 64000, + VIRTIO_SND_PCM_RATE_88200 => 88200, + VIRTIO_SND_PCM_RATE_96000 => 96000, + VIRTIO_SND_PCM_RATE_176400 => 176400, + VIRTIO_SND_PCM_RATE_192000 => 192000, + VIRTIO_SND_PCM_RATE_384000 => 384000, + _ => 44100, + }, + flags: 0, + channels: params.channels as u32, + position: pos, + }; + + let mut audio_info = AudioInfoRaw::new(); + audio_info.set_format(AudioFormat::S16LE); + audio_info.set_rate(info.rate); + audio_info.set_channels(info.channels); + + let values: Vec = PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &Value::Object(Object { + type_: SPA_TYPE_OBJECT_Format, + id: SPA_PARAM_EnumFormat, + properties: audio_info.into(), + }), + ) + .unwrap() + .0 + .into_inner(); + + let value_clone = values.clone(); + + let mut param = [Pod::from_bytes(&values).unwrap()]; + + let props = properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Playback", + }; + + let stream = pw::stream::Stream::new(&self.core, "audio-output", props) + .expect("could not create new stream"); + + let streams = self.stream_params.clone(); + + let listener_stream = stream + .add_local_listener() + .state_changed(|old, new| { + debug!("State changed: {:?} -> {:?}", old, new); + }) + .param_changed(move |stream, id, _data, param| { + let Some(_param) = param else { + return; + }; + if id != ParamType::Format.as_raw() { + return; + } + let mut param = [Pod::from_bytes(&value_clone).unwrap()]; + + //callback to negotiate new set of streams + stream + .update_params(&mut param) + .expect("could not update params"); + }) + .process(move |stream, _data| { + //todo: use safe dequeue_buffer(), contribute queue_buffer() + unsafe { + let b: *mut pw_buffer = stream.dequeue_raw_buffer(); + if b.is_null() { + return; + } + let buf = (*b).buffer; + let datas = (*buf).datas; + + let p = (*datas).data as *mut u8; + + if p.is_null() { + return; + } + + // to calculate as sizeof(int16_t) * NR_CHANNELS + let frame_size = info.channels as u32 * size_of::() as u32; + let req = (*b).requested * (frame_size as u64); + let mut n_bytes = cmp::min(req as u32, (*datas).maxsize); + + let mut streams = streams.write().unwrap(); + let streams = streams + .get_mut(stream_id as usize) + .expect("Stream does not exist"); + let Some(buffer) = streams.buffers.front_mut() else { + return; + }; + + let mut start = buffer.pos; + + let avail = (buffer.bytes.len() - start) as i32; + + if avail <= 0 { + // pad with silence + ptr::write_bytes(p, 0, n_bytes as usize); + } else { + if avail < n_bytes as i32 { + n_bytes = avail.try_into().unwrap(); + } + + let slice = &buffer.bytes[buffer.pos..]; + p.copy_from(slice.as_ptr(), slice.len()); + + start += n_bytes as usize; + + buffer.pos = start; + + if start >= buffer.bytes.len() { + streams.buffers.pop_front(); + } + } + + (*(*datas).chunk).offset = 0; + (*(*datas).chunk).stride = frame_size as i32; + (*(*datas).chunk).size = n_bytes; + + stream.queue_raw_buffer(b); + } + }) + .register() + .expect("failed to register stream listener"); + + stream_listener.insert(stream_id, listener_stream); + + let direction = match stream_params[stream_id as usize].direction { + VIRTIO_SND_D_OUTPUT => spa::Direction::Output, + VIRTIO_SND_D_INPUT => spa::Direction::Input, + _ => panic!("Invalid direction"), + }; + + stream + .connect( + direction, + Some(pw::constants::ID_ANY), + pw::stream::StreamFlags::RT_PROCESS + | pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::INACTIVE + | pw::stream::StreamFlags::MAP_BUFFERS, + &mut param, + ) + .expect("could not connect to the stream"); + + self.thread_loop.unlock(); + + // insert created stream in a hash table + stream_hash.insert(stream_id, stream); + } + + Ok(()) + } + + fn release(&self, stream_id: u32, mut msg: ControlMessage) -> Result<()> { + debug!("pipewire backend, release function"); + let release_result = self.stream_params.write().unwrap()[stream_id as usize] + .state + .release(); + if let Err(err) = release_result { + log::error!("Stream {} release {}", stream_id, err); + msg.code = VIRTIO_SND_S_BAD_MSG; + } else { + self.thread_loop.lock(); + let mut stream_hash = self.stream_hash.write().unwrap(); + let mut stream_listener = self.stream_listener.write().unwrap(); + let st_buffer = &mut self.stream_params.write().unwrap(); + + let Some(stream) = stream_hash.get(&stream_id) else { + return Err(Error::StreamWithIdNotFound(stream_id)); + }; + stream.disconnect().expect("could not disconnect stream"); + std::mem::take(&mut st_buffer[stream_id as usize].buffers); + stream_hash.remove(&stream_id); + stream_listener.remove(&stream_id); + + self.thread_loop.unlock(); + } + + Ok(()) + } + + fn start(&self, stream_id: u32) -> Result<()> { + debug!("pipewire start"); + let start_result = self.stream_params.write().unwrap()[stream_id as usize] + .state + .start(); + if let Err(err) = start_result { + // log the error and continue + log::error!("Stream {} start {}", stream_id, err); + } else { + self.thread_loop.lock(); + let stream_hash = self.stream_hash.read().unwrap(); + let Some(stream) = stream_hash.get(&stream_id) else { + return Err(Error::StreamWithIdNotFound(stream_id)); + }; + stream.set_active(true).expect("could not start stream"); + self.thread_loop.unlock(); + } + Ok(()) + } + + fn stop(&self, stream_id: u32) -> Result<()> { + debug!("pipewire stop"); + let stop_result = self.stream_params.write().unwrap()[stream_id as usize] + .state + .stop(); + if let Err(err) = stop_result { + log::error!("Stream {} stop {}", stream_id, err); + } else { + self.thread_loop.lock(); + let stream_hash = self.stream_hash.read().unwrap(); + let Some(stream) = stream_hash.get(&stream_id) else { + return Err(Error::StreamWithIdNotFound(stream_id)); + }; + stream.set_active(false).expect("could not stop stream"); + self.thread_loop.unlock(); + } + Ok(()) } } diff --git a/crates/sound/src/audio_backends/pw_backend.rs b/crates/sound/src/audio_backends/pw_backend.rs deleted file mode 100644 index b6c2c58..0000000 --- a/crates/sound/src/audio_backends/pw_backend.rs +++ /dev/null @@ -1,172 +0,0 @@ -// Pipewire backend device -// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause - -use super::AudioBackend; -use crate::vhu_sound::NR_STREAMS; -use crate::PCMParams; -use crate::Result; -use std::ops::Deref; -use std::ptr; -use std::ptr::NonNull; -use std::sync::Arc; -use std::sync::RwLock; - -use pipewire as pw; -use pw::sys::{pw_loop, pw_thread_loop_new, pw_thread_loop_signal, PW_ID_CORE}; -use pw::sys::{pw_thread_loop, pw_thread_loop_start, pw_thread_loop_wait}; -use pw::sys::{pw_thread_loop_get_loop, pw_thread_loop_lock, pw_thread_loop_unlock}; -use pw::Core; -use pw::LoopRef; - -struct PwThreadLoop(NonNull); - -impl PwThreadLoop { - pub fn new(name: Option<&str>) -> Option { - let inner = unsafe { - pw_thread_loop_new( - name.map_or(ptr::null(), |p| p.as_ptr() as *const _), - std::ptr::null_mut(), - ) - }; - if inner.is_null() { - None - } else { - Some(Self( - NonNull::new(inner).expect("pw_thread_loop can't be null"), - )) - } - } - - pub fn get_loop(&self) -> PwThreadLoopTheLoop { - let inner = unsafe { pw_thread_loop_get_loop(self.0.as_ptr()) }; - PwThreadLoopTheLoop(NonNull::new(inner).unwrap()) - } - - pub fn unlock(&self) { - unsafe { pw_thread_loop_unlock(self.0.as_ptr()) } - } - - pub fn lock(&self) { - unsafe { pw_thread_loop_lock(self.0.as_ptr()) } - } - - pub fn start(&self) { - unsafe { - pw_thread_loop_start(self.0.as_ptr()); - } - } - - pub fn signal(&self) { - unsafe { - pw_thread_loop_signal(self.0.as_ptr(), false); - } - } - - pub fn wait(&self) { - unsafe { - pw_thread_loop_wait(self.0.as_ptr()); - } - } -} - -#[derive(Debug, Clone)] -struct PwThreadLoopTheLoop(NonNull); - -impl AsRef for PwThreadLoopTheLoop { - fn as_ref(&self) -> &LoopRef { - self.deref() - } -} - -impl Deref for PwThreadLoopTheLoop { - type Target = LoopRef; - - fn deref(&self) -> &Self::Target { - unsafe { &*(self.0.as_ptr() as *mut LoopRef) } - } -} - -// SAFETY: Safe as the structure can be sent to another thread. -unsafe impl Send for PwBackend {} - -// SAFETY: Safe as the structure can be shared with another thread as the state -// is protected with a lock. -unsafe impl Sync for PwBackend {} - -pub struct PwBackend { - thread_loop: Arc, - pub core: Core, - pub stream_params: RwLock>, -} - -impl PwBackend { - pub fn new() -> Self { - pw::init(); - - let thread_loop = Arc::new(PwThreadLoop::new(Some("Pipewire thread loop")).unwrap()); - let get_loop = thread_loop.get_loop(); - - thread_loop.lock(); - - let context = pw::Context::new(&get_loop).expect("failed to create context"); - thread_loop.start(); - let core = context.connect(None).expect("Failed to connect to core"); - - // Create new reference for the variable so that it can be moved into the closure. - let thread_clone = thread_loop.clone(); - - // Trigger the sync event. The server's answer won't be processed until we start the thread loop, - // so we can safely do this before setting up a callback. This lets us avoid using a Cell. - let pending = core.sync(0).expect("sync failed"); - let _listener_core = core - .add_listener_local() - .done(move |id, seq| { - if id == PW_ID_CORE && seq == pending { - thread_clone.signal(); - } - }) - .register(); - - thread_loop.wait(); - thread_loop.unlock(); - - println!("pipewire backend running"); - - let streams_param = vec![PCMParams::default(); NR_STREAMS]; - - Self { - thread_loop, - core, - stream_params: RwLock::new(streams_param), - } - } -} - -impl AudioBackend for PwBackend { - fn write(&self, stream_id: u32) -> Result<()> { - println!("pipewire backend, writting to stream: {}", stream_id); - Ok(()) - } - - fn read(&self, _stream_id: u32) -> Result<()> { - /* - let buf = req.data_slice().ok_or(Error::SoundReqMissingData)?; - let zero_mem = vec![0u8; buf.len()]; - - buf.copy_from(&zero_mem); - */ - Ok(()) - } - - fn set_param(&self, stream_id: u32, params: PCMParams) -> Result<()> { - let mut stream_params = self.stream_params.write().unwrap(); - stream_params[stream_id as usize] = params; - Ok(()) - } - - fn prepare(&self, _stream_id: u32) -> Result<()> { - self.thread_loop.lock(); - self.thread_loop.unlock(); - Ok(()) - } -} diff --git a/crates/sound/src/lib.rs b/crates/sound/src/lib.rs index 23fed0a..fd277e9 100644 --- a/crates/sound/src/lib.rs +++ b/crates/sound/src/lib.rs @@ -77,6 +77,10 @@ pub enum Error { UnexpectedDescriptorSize(usize, u32), #[error("Protocol or device error: {0}")] Stream(stream::Error), + #[error("Stream with id {0} not found")] + StreamWithIdNotFound(u32), + #[error("Channel number not supported: {0}")] + ChannelNotSupported(u8), } impl From for IoError {