From b5965096602fd47c36174defa86e5d0a42195978 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Mon, 2 Dec 2024 13:54:30 +0200 Subject: [PATCH] sound/pipewire: add truncated exp backoff to tests Sometimes pw backend creation on CI fails with errors like for example: thread 'audio_backends::pipewire::tests::test_pipewire_backend_invalid_stream' panicked at vhost-device-sound/src/audio_backends/pipewire.rs:98:42: Failed to connect to core: CreationFailed Add a simple kind of truncated exponential backoff retry wrapper to PwBackend::new(). Signed-off-by: Manos Pitsidianakis --- Cargo.lock | 1 + vhost-device-sound/Cargo.toml | 3 + vhost-device-sound/src/audio_backends.rs | 7 +- .../src/audio_backends/pipewire.rs | 22 ++++-- .../src/audio_backends/pipewire/test_utils.rs | 73 ++++++++++++++++++- 5 files changed, 96 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f10789..e2c40d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1798,6 +1798,7 @@ dependencies = [ "env_logger", "log", "pipewire", + "rand", "rstest", "tempfile", "thiserror 2.0.3", diff --git a/vhost-device-sound/Cargo.toml b/vhost-device-sound/Cargo.toml index e1e72fd..794cb6d 100644 --- a/vhost-device-sound/Cargo.toml +++ b/vhost-device-sound/Cargo.toml @@ -38,3 +38,6 @@ rstest = "0.23.0" tempfile = "3.14" virtio-queue = { version = "0.14", features = ["test-utils"] } vm-memory = { version = "0.16.1", features = ["backend-mmap", "backend-atomic"] } + +[target.'cfg(target_env = "gnu")'.dev-dependencies] +rand = { version = "0.8.5" } diff --git a/vhost-device-sound/src/audio_backends.rs b/vhost-device-sound/src/audio_backends.rs index 27ebf06..eb0a897 100644 --- a/vhost-device-sound/src/audio_backends.rs +++ b/vhost-device-sound/src/audio_backends.rs @@ -80,11 +80,14 @@ mod tests { } #[cfg(all(feature = "pw-backend", target_env = "gnu"))] { - use pipewire::{test_utils::PipewireTestHarness, *}; + use pipewire::{ + test_utils::{try_backoff, PipewireTestHarness}, + *, + }; let _test_harness = PipewireTestHarness::new(); let v = BackendType::Pipewire; - let value = alloc_audio_backend(v, Default::default()).unwrap(); + let value = try_backoff(|| alloc_audio_backend(v, Default::default()), std::num::NonZeroU32::new(3)).expect("reached maximum retry count"); assert_eq!(TypeId::of::(), value.as_any().type_id()); } #[cfg(all(feature = "alsa-backend", target_env = "gnu"))] diff --git a/vhost-device-sound/src/audio_backends/pipewire.rs b/vhost-device-sound/src/audio_backends/pipewire.rs index 46fbad9..1dfcd78 100644 --- a/vhost-device-sound/src/audio_backends/pipewire.rs +++ b/vhost-device-sound/src/audio_backends/pipewire.rs @@ -596,17 +596,23 @@ pub mod test_utils; #[cfg(test)] mod tests { - use super::{test_utils::PipewireTestHarness, *}; + use super::{ + test_utils::{try_backoff, PipewireTestHarness}, + *, + }; #[test] fn test_pipewire_backend_success() { crate::init_logger(); - let streams = Arc::new(RwLock::new(vec![Stream::default()])); - let stream_params = streams.clone(); + let stream_params = Arc::new(RwLock::new(vec![Stream::default()])); let _test_harness = PipewireTestHarness::new(); - let pw_backend = PwBackend::new(stream_params).unwrap(); + let pw_backend = try_backoff( + || PwBackend::new(stream_params.clone()), + std::num::NonZeroU32::new(3), + ) + .expect("reached maximum retry count"); assert_eq!(pw_backend.stream_hash.read().unwrap().len(), 0); assert_eq!(pw_backend.stream_listener.read().unwrap().len(), 0); // set up minimal configuration for test @@ -623,7 +629,7 @@ mod tests { pw_backend.read(0).unwrap(); pw_backend.stop(0).unwrap(); pw_backend.release(0).unwrap(); - let streams = streams.read().unwrap(); + let streams = stream_params.read().unwrap(); assert_eq!(streams[0].requests.len(), 0); } @@ -634,7 +640,11 @@ mod tests { let _test_harness = PipewireTestHarness::new(); - let pw_backend = PwBackend::new(stream_params).unwrap(); + let pw_backend = try_backoff( + || PwBackend::new(stream_params.clone()), + std::num::NonZeroU32::new(3), + ) + .expect("reached maximum retry count"); let request = VirtioSndPcmSetParams::default(); let res = pw_backend.set_parameters(0, request); diff --git a/vhost-device-sound/src/audio_backends/pipewire/test_utils.rs b/vhost-device-sound/src/audio_backends/pipewire/test_utils.rs index c18a2f2..65a7d59 100644 --- a/vhost-device-sound/src/audio_backends/pipewire/test_utils.rs +++ b/vhost-device-sound/src/audio_backends/pipewire/test_utils.rs @@ -2,11 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause use std::{ + convert::TryFrom, io::Read, path::Path, process::{Child, Command, Stdio}, + thread::sleep, + time::{Duration, Instant}, }; +use rand::distributions::Uniform; use tempfile::{tempdir, TempDir}; /// Temporary Dbus session which is killed in drop(). @@ -80,13 +84,13 @@ impl PipewireTestHarness { println!("INFO: dbus_session_bus_address={}", dbus_session.address); println!("INFO: Wait for dbus to setup..."); - std::thread::sleep(std::time::Duration::from_secs(1)); + sleep(Duration::from_secs(1)); println!("INFO: Launch pipewire."); let pipewire_child = launch_pipewire(tempdir.path(), Path::new(&dbus_session.address)) .expect("ERROR: Could not launch pipewire"); println!("INFO: Wait for pipewire to setup..."); - std::thread::sleep(std::time::Duration::from_secs(1)); + sleep(Duration::from_secs(1)); std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &dbus_session.address); std::env::set_var("XDG_RUNTIME_DIR", tempdir.path()); @@ -132,3 +136,68 @@ fn print_output(child: &mut Child, id: &'static str) -> Option<()> { None } + +pub fn truncated_wait_delay, R: rand::Rng>( + slot_time: &Duration, + attempts_so_far: u32, + exponent_max: u32, + rng: &mut R, + distribution: &D, +) -> Duration { + let attempts_so_far = + i32::try_from(attempts_so_far.clamp(0_u32, exponent_max)).unwrap_or(i32::MAX); + let position = distribution.sample(rng); + let max = 2_f32.powi(attempts_so_far) - 1.0_f32; + slot_time.mul_f32(position * max) +} + +pub fn try_backoff( + closure: impl Fn() -> Result, + max_retries: Option, +) -> Result { + const NO_DELAY: Duration = Duration::new(0, 0); + + let max_retries: Option = max_retries.map(Into::into); + let mut iterations: u32 = 0; + let mut dur: Option = None; + let mut rng = rand::thread_rng(); + + let distribution = Uniform::new(0.0_f32, 1.0_f32); + + loop { + if max_retries.map_or(false, |max| iterations >= max) { + return Err(()); + } + + let start: Instant = Instant::now(); + let result: Result = closure(); + let elapsed: Duration = start.elapsed(); + + iterations += 1; + + match result { + Ok(v) => return Ok(v), + Err(err) => { + log::debug!("try_backoff: closured failed with {err}, will retry"); + } + } + + if let Some(dur_val) = &dur { + dur = Some((*dur_val * (iterations - 1) + elapsed) / iterations); + } else { + dur = Some(elapsed); + } + + let delay: Duration = truncated_wait_delay( + dur.as_ref().unwrap_or(&NO_DELAY), + iterations, + 10_u32, + &mut rng, + &distribution, + ); + + log::debug!("Sleeping for {}s", delay.as_secs_f64()); + + sleep(delay); + } +}