diff --git a/proxmox-async/Cargo.toml b/proxmox-async/Cargo.toml index 1c9a9744..ba5a2fff 100644 --- a/proxmox-async/Cargo.toml +++ b/proxmox-async/Cargo.toml @@ -13,4 +13,4 @@ anyhow = "1.0" futures = "0.3" lazy_static = "1.4" pin-utils = "0.1.0" -tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "sync"] } diff --git a/proxmox-async/debian/changelog b/proxmox-async/debian/changelog index 6e3fb013..16635a40 100644 --- a/proxmox-async/debian/changelog +++ b/proxmox-async/debian/changelog @@ -1,10 +1,11 @@ rust-proxmox-async (0.1.0) stable; urgency=medium - + + * imported pbs-tools/src/broadcast_future.rs + * imported pbs-tools/src/blocking.rs * imported pbs-runtime/src/lib.rs to runtime.rs - + * initial release -- root Fri, 19 Nov 2021 15:43:44 +0100 - diff --git a/proxmox-async/src/broadcast_future.rs b/proxmox-async/src/broadcast_future.rs new file mode 100644 index 00000000..7bfd83b7 --- /dev/null +++ b/proxmox-async/src/broadcast_future.rs @@ -0,0 +1,180 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; + +use anyhow::{format_err, Error}; +use futures::future::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; + +/// Broadcast results to registered listeners using asnyc oneshot channels +#[derive(Default)] +pub struct BroadcastData { + result: Option>, + listeners: Vec>>, +} + +impl BroadcastData { + + pub fn new() -> Self { + Self { + result: None, + listeners: vec![], + } + } + + pub fn notify_listeners(&mut self, result: Result) { + + self.result = Some(result.clone()); + + loop { + match self.listeners.pop() { + None => { break; }, + Some(ch) => { + match &result { + Ok(result) => { let _ = ch.send(Ok(result.clone())); }, + Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, + } + }, + } + } + } + + pub fn listen(&mut self) -> impl Future> { + use futures::future::{ok, Either}; + + match &self.result { + None => {}, + Some(Ok(result)) => return Either::Left(ok(result.clone())), + Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))), + } + + let (tx, rx) = oneshot::channel::>(); + + self.listeners.push(tx); + + Either::Right(rx + .map(|res| match res { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(e), + Err(e) => Err(Error::from(e)), + }) + ) + } +} + +type SourceFuture = Pin> + Send>>; + +struct BroadCastFutureBinding { + broadcast: BroadcastData, + future: Option>, +} + +/// Broadcast future results to registered listeners +pub struct BroadcastFuture { + inner: Arc>>, +} + +impl BroadcastFuture { + /// Create instance for specified source future. + /// + /// The result of the future is sent to all registered listeners. + pub fn new(source: Box> + Send>) -> Self { + let inner = BroadCastFutureBinding { + broadcast: BroadcastData::new(), + future: Some(Pin::from(source)), + }; + Self { inner: Arc::new(Mutex::new(inner)) } + } + + /// Creates a new instance with a oneshot channel as trigger + pub fn new_oneshot() -> (Self, oneshot::Sender>) { + + let (tx, rx) = oneshot::channel::>(); + let rx = rx + .map_err(Error::from) + .and_then(futures::future::ready); + + (Self::new(Box::new(rx)), tx) + } + + fn notify_listeners( + inner: Arc>>, + result: Result, + ) { + let mut data = inner.lock().unwrap(); + data.broadcast.notify_listeners(result); + } + + fn spawn(inner: Arc>>) -> impl Future> { + let mut data = inner.lock().unwrap(); + + if let Some(source) = data.future.take() { + + let inner1 = inner.clone(); + + let task = source.map(move |value| { + match value { + Ok(value) => Self::notify_listeners(inner1, Ok(value)), + Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), + } + }); + tokio::spawn(task); + } + + data.broadcast.listen() + } + + /// Register a listener + pub fn listen(&self) -> impl Future> { + let inner2 = self.inner.clone(); + async move { Self::spawn(inner2).await } + } +} + +#[test] +fn test_broadcast_future() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + static CHECKSUM: AtomicUsize = AtomicUsize::new(0); + + let (sender, trigger) = BroadcastFuture::new_oneshot(); + + let receiver1 = sender.listen() + .map_ok(|res| { + CHECKSUM.fetch_add(res, Ordering::SeqCst); + }) + .map_err(|err| { panic!("got error {}", err); }) + .map(|_| ()); + + let receiver2 = sender.listen() + .map_ok(|res| { + CHECKSUM.fetch_add(res*2, Ordering::SeqCst); + }) + .map_err(|err| { panic!("got error {}", err); }) + .map(|_| ()); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + let r1 = tokio::spawn(receiver1); + let r2 = tokio::spawn(receiver2); + + trigger.send(Ok(1)).unwrap(); + let _ = r1.await; + let _ = r2.await; + }); + + let result = CHECKSUM.load(Ordering::SeqCst); + + assert_eq!(result, 3); + + // the result stays available until the BroadcastFuture is dropped + rt.block_on(sender.listen() + .map_ok(|res| { + CHECKSUM.fetch_add(res*4, Ordering::SeqCst); + }) + .map_err(|err| { panic!("got error {}", err); }) + .map(|_| ())); + + let result = CHECKSUM.load(Ordering::SeqCst); + assert_eq!(result, 7); +} diff --git a/proxmox-async/src/lib.rs b/proxmox-async/src/lib.rs index 340711cd..7f7dd83a 100644 --- a/proxmox-async/src/lib.rs +++ b/proxmox-async/src/lib.rs @@ -1,2 +1,3 @@ pub mod blocking; +pub mod broadcast_future; pub mod runtime;