forked from proxmox-mirrors/proxmox
async: runtime: Modernise module and update docs
This commit updates all helper functions, taking into account recent developments regarding `tokio`. In particular, the `block_in_place()` and `block_on()` functions now don't panic anymore if used within the single-threaded `tokio` runtime and instead behave as expected in both runtime flavours. Furthermore, because `tokio` may add more runtime flavours in the future, all helpers will now panic if used within an unsupported runtime. This is to prevent unforeseen behavioural quirks and interactions with `tokio` internals. The above changes make `BlockingGuard` redundant; it is consequently removed. The documentation is also updated, describing the behaviour of the helper functions and the purpose of the `runtime.rs` module in more detail. Signed-off-by: Max Carrara <m.carrara@proxmox.com>
This commit is contained in:
parent
ede73a6561
commit
1f351625a5
@ -1,6 +1,37 @@
|
|||||||
//! Helpers for quirks of the current tokio runtime.
|
//! Helpers for quirks of the current tokio runtime.
|
||||||
|
//!
|
||||||
|
//! It is preferred to use these helpers throughout our applications.
|
||||||
|
//!
|
||||||
|
//! # `tokio`, Runtime Flavors, and Panics
|
||||||
|
//!
|
||||||
|
//! Because [`tokio`] may introduce more [`RuntimeFlavor`s][RuntimeFlavor] in the future,
|
||||||
|
//! we [`panic!`] on flavors we're not (yet) explicitly supporting.
|
||||||
|
//!
|
||||||
|
//! This is done for forward-compatibility's sake in order to prevent unforeseen
|
||||||
|
//! interactions with [`tokio`], such as with [`tokio::task::block_in_place`],
|
||||||
|
//! which [`panic!`s][panic!] *only* if called within a [`CurrentThread`][ct-rt]-flavored
|
||||||
|
//! runtime, but not in a [`MultiThread`][mt-rt]-flavored runtime or if there's
|
||||||
|
//! *no runtime* at all.
|
||||||
|
//!
|
||||||
|
//! All [`panic!`s][panic!] can otherwise be either avoided or caught early by instantiating
|
||||||
|
//! your runtime with [`get_runtime()`] or [`get_runtime_with_builder()`]. Or, if you're
|
||||||
|
//! creating a separate async application, use [`main()`] for convenience.
|
||||||
|
//!
|
||||||
|
//! ## Supported [`RuntimeFlavor`s][RuntimeFlavor]
|
||||||
|
//!
|
||||||
|
//! * [`RuntimeFlavor::MultiThread`]
|
||||||
|
//! * [`RuntimeFlavor::CurrentThread`]
|
||||||
|
//!
|
||||||
|
//! # [`tokio`] and OpenSSL
|
||||||
|
//!
|
||||||
|
//! There's a nasty [OpenSSL bug][openssl-bug] causing a race between OpenSSL clean-up handlers
|
||||||
|
//! and the [`tokio`] runtime. This however is handled by [`get_runtime_with_builder()`]
|
||||||
|
//! and thus also within [`get_runtime()`] and our [`main()`] wrapper.
|
||||||
|
//!
|
||||||
|
//! [ct-rt]: RuntimeFlavor::CurrentThread
|
||||||
|
//! [mt-rt]: RuntimeFlavor::MultiThread
|
||||||
|
//! [openssl-bug]: https://github.com/openssl/openssl/issues/6214
|
||||||
|
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::sync::{Arc, Mutex, Weak};
|
use std::sync::{Arc, Mutex, Weak};
|
||||||
use std::task::{Context, Poll, Waker};
|
use std::task::{Context, Poll, Waker};
|
||||||
@ -8,39 +39,7 @@ use std::thread::{self, Thread};
|
|||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use pin_utils::pin_mut;
|
use pin_utils::pin_mut;
|
||||||
use tokio::runtime::{self, Runtime};
|
use tokio::runtime::{self, Runtime, RuntimeFlavor};
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
static BLOCKING: RefCell<bool> = RefCell::new(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_in_tokio() -> bool {
|
|
||||||
tokio::runtime::Handle::try_current().is_ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_blocking() -> bool {
|
|
||||||
BLOCKING.with(|v| *v.borrow())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BlockingGuard(bool);
|
|
||||||
|
|
||||||
impl BlockingGuard {
|
|
||||||
fn set() -> Self {
|
|
||||||
Self(BLOCKING.with(|v| {
|
|
||||||
let old = *v.borrow();
|
|
||||||
*v.borrow_mut() = true;
|
|
||||||
old
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for BlockingGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
BLOCKING.with(|v| {
|
|
||||||
*v.borrow_mut() = self.0;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
// avoid openssl bug: https://github.com/openssl/openssl/issues/6214
|
// avoid openssl bug: https://github.com/openssl/openssl/issues/6214
|
||||||
@ -53,14 +52,28 @@ extern "C" {
|
|||||||
fn OPENSSL_thread_stop();
|
fn OPENSSL_thread_stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get or create the current main tokio runtime.
|
#[inline]
|
||||||
|
fn panic_on_bad_flavor(runtime: &runtime::Runtime) {
|
||||||
|
match runtime.handle().runtime_flavor() {
|
||||||
|
RuntimeFlavor::CurrentThread => (),
|
||||||
|
RuntimeFlavor::MultiThread => (),
|
||||||
|
bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get or build the current main [`tokio`] [`Runtime`]. Useful if [`tokio`'s][tokio] defaults
|
||||||
|
/// don't suit your needs.
|
||||||
///
|
///
|
||||||
/// This makes sure that tokio's worker threads are marked for us so that we know whether we
|
/// # Panics
|
||||||
/// can/need to use `block_in_place` in our `block_on` helper.
|
/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
|
||||||
|
/// See the [module level][mod] documentation for more details.
|
||||||
|
///
|
||||||
|
/// [mod]: self
|
||||||
pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
|
pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
|
||||||
let mut guard = RUNTIME.lock().unwrap();
|
let mut guard = RUNTIME.lock().unwrap();
|
||||||
|
|
||||||
if let Some(rt) = guard.upgrade() {
|
if let Some(rt) = guard.upgrade() {
|
||||||
|
panic_on_bad_flavor(&rt);
|
||||||
return rt;
|
return rt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,6 +87,8 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
|
|||||||
});
|
});
|
||||||
|
|
||||||
let runtime = builder.build().expect("failed to spawn tokio runtime");
|
let runtime = builder.build().expect("failed to spawn tokio runtime");
|
||||||
|
panic_on_bad_flavor(&runtime);
|
||||||
|
|
||||||
let rt = Arc::new(runtime);
|
let rt = Arc::new(runtime);
|
||||||
|
|
||||||
*guard = Arc::downgrade(&rt);
|
*guard = Arc::downgrade(&rt);
|
||||||
@ -81,9 +96,12 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
|
|||||||
rt
|
rt
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get or create the current main tokio runtime.
|
/// Get or create the current main [`tokio`] [`Runtime`].
|
||||||
///
|
///
|
||||||
/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
|
/// This is a convenience wrapper around [`get_runtime_with_builder()`] using
|
||||||
|
/// [`tokio`'s multithreaded runtime][mt-rt-meth].
|
||||||
|
///
|
||||||
|
/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
|
||||||
pub fn get_runtime() -> Arc<Runtime> {
|
pub fn get_runtime() -> Arc<Runtime> {
|
||||||
get_runtime_with_builder(|| {
|
get_runtime_with_builder(|| {
|
||||||
let mut builder = runtime::Builder::new_multi_thread();
|
let mut builder = runtime::Builder::new_multi_thread();
|
||||||
@ -93,67 +111,89 @@ pub fn get_runtime() -> Arc<Runtime> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Block on a synchronous piece of code.
|
/// Block on a synchronous piece of code.
|
||||||
pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
|
///
|
||||||
// don't double-exit the context (tokio doesn't like that)
|
/// This is a wrapper around [`tokio::task::block_in_place()`] that allows to
|
||||||
// also, if we're not actually in a tokio-worker we must not use block_in_place() either
|
/// block the current thread even within a [`Runtime`] with [`RuntimeFlavor::CurrentThread`].
|
||||||
if is_blocking() || !is_in_tokio() {
|
///
|
||||||
fut()
|
/// Normally, [tokio's `block_in_place()`][bip] [`panic`s][panic] when called in
|
||||||
|
/// such a case; this function instead just runs the piece of code right away, preventing
|
||||||
|
/// an unforeseen panic.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// If you're in a [`CurrentThread`][RuntimeFlavor::CurrentThread] runtime and you
|
||||||
|
/// *really* need to execute a bunch of blocking code, you might want to consider
|
||||||
|
/// executing that code with [`tokio::task::spawn_blocking()`] instead. This prevents
|
||||||
|
/// blocking the single-threaded runtime and still allows you to communicate via channels.
|
||||||
|
///
|
||||||
|
/// See [tokio's documentation on CPU-bound tasks and blocking code][tok-block-doc]
|
||||||
|
/// for more information.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
|
||||||
|
/// See the [module level][mod] documentation for more details.
|
||||||
|
///
|
||||||
|
/// [bip]: tokio::task::block_in_place()
|
||||||
|
/// [mod]: self
|
||||||
|
/// [sp]: tokio::task::spawn_blocking()
|
||||||
|
/// [tok-block-doc]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
|
||||||
|
pub fn block_in_place<R>(func: impl FnOnce() -> R) -> R {
|
||||||
|
if let Ok(runtime) = runtime::Handle::try_current() {
|
||||||
|
match runtime.runtime_flavor() {
|
||||||
|
RuntimeFlavor::CurrentThread => func(),
|
||||||
|
RuntimeFlavor::MultiThread => tokio::task::block_in_place(func),
|
||||||
|
bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// we are in an actual tokio worker thread, block it:
|
func()
|
||||||
tokio::task::block_in_place(move || {
|
|
||||||
let _guard = BlockingGuard::set();
|
|
||||||
fut()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block on a future in this thread.
|
/// Block on a future in the current thread.
|
||||||
pub fn block_on<F: Future>(fut: F) -> F::Output {
|
///
|
||||||
// don't double-exit the context (tokio doesn't like that)
|
/// Not to be confused with [`tokio::runtime::Handle::block_on()`] and
|
||||||
if is_blocking() {
|
/// [`tokio::runtime::Runtime::block_on()`].
|
||||||
block_on_local_future(fut)
|
///
|
||||||
} else if is_in_tokio() {
|
/// This will prevent other futures from running in the current thread in the meantime.
|
||||||
// inside a tokio worker we need to tell tokio that we're about to really block:
|
/// Essentially, this is [`block_in_place()`], but for [`Future`s][Future] instead of functions.
|
||||||
tokio::task::block_in_place(move || {
|
///
|
||||||
let _guard = BlockingGuard::set();
|
/// If there's no runtime currently active, this function will create a temporary one
|
||||||
block_on_local_future(fut)
|
/// using [`get_runtime()`] in order to block on and finish running the provided [`Future`].
|
||||||
})
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
|
||||||
|
/// See the [module level][mod] documentation for more details.
|
||||||
|
///
|
||||||
|
/// [mod]: self
|
||||||
|
pub fn block_on<F: Future>(future: F) -> F::Output {
|
||||||
|
if let Ok(runtime) = runtime::Handle::try_current() {
|
||||||
|
match runtime.runtime_flavor() {
|
||||||
|
RuntimeFlavor::CurrentThread => block_on_local_future(future),
|
||||||
|
RuntimeFlavor::MultiThread => {
|
||||||
|
tokio::task::block_in_place(move || block_on_local_future(future))
|
||||||
|
}
|
||||||
|
bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
|
let runtime = get_runtime();
|
||||||
// it on demand if necessary), then enter it
|
let _enter_guard = runtime.enter();
|
||||||
let _guard = BlockingGuard::set();
|
|
||||||
let _enter_guard = get_runtime().enter();
|
runtime.block_on(future)
|
||||||
get_runtime().block_on(fut)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/// This is our [`tokio`] entrypoint, which blocks on the provided [`Future`]
|
||||||
fn block_on_impl<F>(mut fut: F) -> F::Output
|
/// until it's completed, using [`tokio`'s multithreaded runtime][mt-rt-meth].
|
||||||
where
|
///
|
||||||
F: Future + Send,
|
/// It is preferred to use this function over other ways of instantiating a runtime.
|
||||||
F::Output: Send + 'static,
|
/// See the [module level][mod] documentation for more information.
|
||||||
{
|
///
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
/// [mod]: self
|
||||||
let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
|
/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
|
||||||
tokio::spawn(async move {
|
|
||||||
let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
|
|
||||||
tx
|
|
||||||
.send(fut.await)
|
|
||||||
.map_err(drop)
|
|
||||||
.expect("failed to send block_on result to channel")
|
|
||||||
});
|
|
||||||
|
|
||||||
futures::executor::block_on(async move {
|
|
||||||
rx.await.expect("failed to receive block_on result from channel")
|
|
||||||
})
|
|
||||||
std::mem::forget(fut);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
|
|
||||||
/// compatibility, which will perform all the necessary tasks on-demand anyway.
|
|
||||||
pub fn main<F: Future>(fut: F) -> F::Output {
|
pub fn main<F: Future>(fut: F) -> F::Output {
|
||||||
block_on(fut)
|
let runtime = get_runtime();
|
||||||
|
let _enter_guard = runtime.enter();
|
||||||
|
|
||||||
|
runtime.block_on(fut)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ThreadWaker(Thread);
|
struct ThreadWaker(Thread);
|
||||||
|
Loading…
Reference in New Issue
Block a user