sound/pipewire: add truncated exp backoff to tests

Sometimes pw backend creation on CI fails with errors like for example:

    thread 'audio_backends::pipewire::tests::test_pipewire_backend_invalid_stream' panicked at vhost-device-sound/src/audio_backends/pipewire.rs:98:42:
    Failed to connect to core: CreationFailed

Add a simple kind of truncated exponential backoff retry wrapper to
PwBackend::new().

Signed-off-by: Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
This commit is contained in:
Manos Pitsidianakis 2024-12-02 13:54:30 +02:00 committed by Stefano Garzarella
parent d3274444ad
commit b596509660
5 changed files with 96 additions and 10 deletions

1
Cargo.lock generated
View File

@ -1798,6 +1798,7 @@ dependencies = [
"env_logger",
"log",
"pipewire",
"rand",
"rstest",
"tempfile",
"thiserror 2.0.3",

View File

@ -38,3 +38,6 @@ rstest = "0.23.0"
tempfile = "3.14"
virtio-queue = { version = "0.14", features = ["test-utils"] }
vm-memory = { version = "0.16.1", features = ["backend-mmap", "backend-atomic"] }
[target.'cfg(target_env = "gnu")'.dev-dependencies]
rand = { version = "0.8.5" }

View File

@ -80,11 +80,14 @@ mod tests {
}
#[cfg(all(feature = "pw-backend", target_env = "gnu"))]
{
use pipewire::{test_utils::PipewireTestHarness, *};
use pipewire::{
test_utils::{try_backoff, PipewireTestHarness},
*,
};
let _test_harness = PipewireTestHarness::new();
let v = BackendType::Pipewire;
let value = alloc_audio_backend(v, Default::default()).unwrap();
let value = try_backoff(|| alloc_audio_backend(v, Default::default()), std::num::NonZeroU32::new(3)).expect("reached maximum retry count");
assert_eq!(TypeId::of::<PwBackend>(), value.as_any().type_id());
}
#[cfg(all(feature = "alsa-backend", target_env = "gnu"))]

View File

@ -596,17 +596,23 @@ pub mod test_utils;
#[cfg(test)]
mod tests {
use super::{test_utils::PipewireTestHarness, *};
use super::{
test_utils::{try_backoff, PipewireTestHarness},
*,
};
#[test]
fn test_pipewire_backend_success() {
crate::init_logger();
let streams = Arc::new(RwLock::new(vec![Stream::default()]));
let stream_params = streams.clone();
let stream_params = Arc::new(RwLock::new(vec![Stream::default()]));
let _test_harness = PipewireTestHarness::new();
let pw_backend = PwBackend::new(stream_params).unwrap();
let pw_backend = try_backoff(
|| PwBackend::new(stream_params.clone()),
std::num::NonZeroU32::new(3),
)
.expect("reached maximum retry count");
assert_eq!(pw_backend.stream_hash.read().unwrap().len(), 0);
assert_eq!(pw_backend.stream_listener.read().unwrap().len(), 0);
// set up minimal configuration for test
@ -623,7 +629,7 @@ mod tests {
pw_backend.read(0).unwrap();
pw_backend.stop(0).unwrap();
pw_backend.release(0).unwrap();
let streams = streams.read().unwrap();
let streams = stream_params.read().unwrap();
assert_eq!(streams[0].requests.len(), 0);
}
@ -634,7 +640,11 @@ mod tests {
let _test_harness = PipewireTestHarness::new();
let pw_backend = PwBackend::new(stream_params).unwrap();
let pw_backend = try_backoff(
|| PwBackend::new(stream_params.clone()),
std::num::NonZeroU32::new(3),
)
.expect("reached maximum retry count");
let request = VirtioSndPcmSetParams::default();
let res = pw_backend.set_parameters(0, request);

View File

@ -2,11 +2,15 @@
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
convert::TryFrom,
io::Read,
path::Path,
process::{Child, Command, Stdio},
thread::sleep,
time::{Duration, Instant},
};
use rand::distributions::Uniform;
use tempfile::{tempdir, TempDir};
/// Temporary Dbus session which is killed in drop().
@ -80,13 +84,13 @@ impl PipewireTestHarness {
println!("INFO: dbus_session_bus_address={}", dbus_session.address);
println!("INFO: Wait for dbus to setup...");
std::thread::sleep(std::time::Duration::from_secs(1));
sleep(Duration::from_secs(1));
println!("INFO: Launch pipewire.");
let pipewire_child = launch_pipewire(tempdir.path(), Path::new(&dbus_session.address))
.expect("ERROR: Could not launch pipewire");
println!("INFO: Wait for pipewire to setup...");
std::thread::sleep(std::time::Duration::from_secs(1));
sleep(Duration::from_secs(1));
std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &dbus_session.address);
std::env::set_var("XDG_RUNTIME_DIR", tempdir.path());
@ -132,3 +136,68 @@ fn print_output(child: &mut Child, id: &'static str) -> Option<()> {
None
}
pub fn truncated_wait_delay<D: rand::distributions::Distribution<f32>, R: rand::Rng>(
slot_time: &Duration,
attempts_so_far: u32,
exponent_max: u32,
rng: &mut R,
distribution: &D,
) -> Duration {
let attempts_so_far =
i32::try_from(attempts_so_far.clamp(0_u32, exponent_max)).unwrap_or(i32::MAX);
let position = distribution.sample(rng);
let max = 2_f32.powi(attempts_so_far) - 1.0_f32;
slot_time.mul_f32(position * max)
}
pub fn try_backoff<T, E: std::fmt::Display>(
closure: impl Fn() -> Result<T, E>,
max_retries: Option<std::num::NonZeroU32>,
) -> Result<T, ()> {
const NO_DELAY: Duration = Duration::new(0, 0);
let max_retries: Option<u32> = max_retries.map(Into::into);
let mut iterations: u32 = 0;
let mut dur: Option<Duration> = None;
let mut rng = rand::thread_rng();
let distribution = Uniform::new(0.0_f32, 1.0_f32);
loop {
if max_retries.map_or(false, |max| iterations >= max) {
return Err(());
}
let start: Instant = Instant::now();
let result: Result<T, E> = closure();
let elapsed: Duration = start.elapsed();
iterations += 1;
match result {
Ok(v) => return Ok(v),
Err(err) => {
log::debug!("try_backoff: closured failed with {err}, will retry");
}
}
if let Some(dur_val) = &dur {
dur = Some((*dur_val * (iterations - 1) + elapsed) / iterations);
} else {
dur = Some(elapsed);
}
let delay: Duration = truncated_wait_delay(
dur.as_ref().unwrap_or(&NO_DELAY),
iterations,
10_u32,
&mut rng,
&distribution,
);
log::debug!("Sleeping for {}s", delay.as_secs_f64());
sleep(delay);
}
}