mirror of
https://git.proxmox.com/git/proxmox
synced 2025-08-07 15:26:55 +00:00
proxmox-async: start new crate
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
parent
45645d9aee
commit
e1f0eb4aec
@ -2,6 +2,7 @@
|
|||||||
members = [
|
members = [
|
||||||
"proxmox",
|
"proxmox",
|
||||||
"proxmox-api-macro",
|
"proxmox-api-macro",
|
||||||
|
"proxmox-async",
|
||||||
"proxmox-borrow",
|
"proxmox-borrow",
|
||||||
"proxmox-http",
|
"proxmox-http",
|
||||||
"proxmox-io",
|
"proxmox-io",
|
||||||
|
1
Makefile
1
Makefile
@ -3,6 +3,7 @@
|
|||||||
CRATES = \
|
CRATES = \
|
||||||
proxmox \
|
proxmox \
|
||||||
proxmox-api-macro \
|
proxmox-api-macro \
|
||||||
|
proxmox-async \
|
||||||
proxmox-borrow \
|
proxmox-borrow \
|
||||||
proxmox-http \
|
proxmox-http \
|
||||||
proxmox-io \
|
proxmox-io \
|
||||||
|
16
proxmox-async/Cargo.toml
Normal file
16
proxmox-async/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
[package]
|
||||||
|
name = "proxmox-async"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Proxmox Support Team <support@proxmox.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
license = "AGPL-3"
|
||||||
|
description = "Proxmox async/tokio helpers"
|
||||||
|
|
||||||
|
exclude = [ "debian" ]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0"
|
||||||
|
futures = "0.3"
|
||||||
|
lazy_static = "1.4"
|
||||||
|
pin-utils = "0.1.0"
|
||||||
|
tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }
|
10
proxmox-async/debian/changelog
Normal file
10
proxmox-async/debian/changelog
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
rust-proxmox-async (0.1.0) stable; urgency=medium
|
||||||
|
|
||||||
|
* imported pbs-tools/src/blocking.rs
|
||||||
|
|
||||||
|
* imported pbs-runtime/src/lib.rs to runtime.rs
|
||||||
|
|
||||||
|
* initial release
|
||||||
|
|
||||||
|
-- root <root@elsa> Fri, 19 Nov 2021 15:43:44 +0100
|
||||||
|
|
10
proxmox-async/debian/debcargo.toml
Normal file
10
proxmox-async/debian/debcargo.toml
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
overlay = "."
|
||||||
|
crate_src_path = ".."
|
||||||
|
maintainer = "Proxmox Support Team <support@proxmox.com>"
|
||||||
|
|
||||||
|
[source]
|
||||||
|
vcs_git = "git://git.proxmox.com/git/proxmox.git"
|
||||||
|
vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
|
||||||
|
|
||||||
|
[packages.lib]
|
||||||
|
depends = [ "uuid-dev" ]
|
99
proxmox-async/src/blocking.rs
Normal file
99
proxmox-async/src/blocking.rs
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers)
|
||||||
|
|
||||||
|
use std::io::{self, Read};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use std::sync::mpsc::Receiver;
|
||||||
|
|
||||||
|
use futures::stream::Stream;
|
||||||
|
|
||||||
|
use crate::runtime::block_in_place;
|
||||||
|
|
||||||
|
/// Wrapper struct to convert a Reader into a Stream
|
||||||
|
pub struct WrappedReaderStream<R: Read + Unpin> {
|
||||||
|
reader: R,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <R: Read + Unpin> WrappedReaderStream<R> {
|
||||||
|
|
||||||
|
pub fn new(reader: R) -> Self {
|
||||||
|
let mut buffer = Vec::with_capacity(64*1024);
|
||||||
|
unsafe { buffer.set_len(buffer.capacity()); }
|
||||||
|
Self { reader, buffer }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
|
||||||
|
type Item = Result<Vec<u8>, io::Error>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
match block_in_place(|| this.reader.read(&mut this.buffer)) {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
// EOF
|
||||||
|
Poll::Ready(None)
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => Poll::Ready(Some(Err(err))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper struct to convert a channel Receiver into a Stream
|
||||||
|
pub struct StdChannelStream<T>(pub Receiver<T>);
|
||||||
|
|
||||||
|
impl<T> Stream for StdChannelStream<T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
match block_in_place(|| self.0.recv()) {
|
||||||
|
Ok(data) => Poll::Ready(Some(data)),
|
||||||
|
Err(_) => Poll::Ready(None),// channel closed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::io;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use futures::stream::TryStreamExt;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_wrapped_stream_reader() -> Result<(), Error> {
|
||||||
|
crate::runtime::main(async {
|
||||||
|
run_wrapped_stream_reader_test().await
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DummyReader(usize);
|
||||||
|
|
||||||
|
impl io::Read for DummyReader {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
self.0 += 1;
|
||||||
|
|
||||||
|
if self.0 >= 10 {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
|
||||||
|
let mut reader = super::WrappedReaderStream::new(DummyReader(0));
|
||||||
|
while let Some(_data) = reader.try_next().await? {
|
||||||
|
// just waiting
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
2
proxmox-async/src/lib.rs
Normal file
2
proxmox-async/src/lib.rs
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
pub mod blocking;
|
||||||
|
pub mod runtime;
|
203
proxmox-async/src/runtime.rs
Normal file
203
proxmox-async/src/runtime.rs
Normal file
@ -0,0 +1,203 @@
|
|||||||
|
//! Helpers for quirks of the current tokio runtime.
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::sync::{Arc, Weak, Mutex};
|
||||||
|
use std::task::{Context, Poll, RawWaker, Waker};
|
||||||
|
use std::thread::{self, Thread};
|
||||||
|
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use pin_utils::pin_mut;
|
||||||
|
use tokio::runtime::{self, Runtime};
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static BLOCKING: RefCell<bool> = RefCell::new(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_in_tokio() -> bool {
|
||||||
|
tokio::runtime::Handle::try_current()
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_blocking() -> bool {
|
||||||
|
BLOCKING.with(|v| *v.borrow())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BlockingGuard(bool);
|
||||||
|
|
||||||
|
impl BlockingGuard {
|
||||||
|
fn set() -> Self {
|
||||||
|
Self(BLOCKING.with(|v| {
|
||||||
|
let old = *v.borrow();
|
||||||
|
*v.borrow_mut() = true;
|
||||||
|
old
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for BlockingGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
BLOCKING.with(|v| {
|
||||||
|
*v.borrow_mut() = self.0;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
// avoid openssl bug: https://github.com/openssl/openssl/issues/6214
|
||||||
|
// by dropping the runtime as early as possible
|
||||||
|
static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[link(name = "crypto")]
|
||||||
|
extern "C" {
|
||||||
|
fn OPENSSL_thread_stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get or create the current main tokio runtime.
|
||||||
|
///
|
||||||
|
/// This makes sure that tokio's worker threads are marked for us so that we know whether we
|
||||||
|
/// can/need to use `block_in_place` in our `block_on` helper.
|
||||||
|
pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
|
||||||
|
|
||||||
|
let mut guard = RUNTIME.lock().unwrap();
|
||||||
|
|
||||||
|
if let Some(rt) = guard.upgrade() { return rt; }
|
||||||
|
|
||||||
|
let mut builder = get_builder();
|
||||||
|
builder.on_thread_stop(|| {
|
||||||
|
// avoid openssl bug: https://github.com/openssl/openssl/issues/6214
|
||||||
|
// call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
|
||||||
|
unsafe { OPENSSL_thread_stop(); }
|
||||||
|
});
|
||||||
|
|
||||||
|
let runtime = builder.build().expect("failed to spawn tokio runtime");
|
||||||
|
let rt = Arc::new(runtime);
|
||||||
|
|
||||||
|
*guard = Arc::downgrade(&rt);
|
||||||
|
|
||||||
|
rt
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get or create the current main tokio runtime.
|
||||||
|
///
|
||||||
|
/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
|
||||||
|
pub fn get_runtime() -> Arc<Runtime> {
|
||||||
|
|
||||||
|
get_runtime_with_builder(|| {
|
||||||
|
let mut builder = runtime::Builder::new_multi_thread();
|
||||||
|
builder.enable_all();
|
||||||
|
builder
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Block on a synchronous piece of code.
|
||||||
|
pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
|
||||||
|
// don't double-exit the context (tokio doesn't like that)
|
||||||
|
// also, if we're not actually in a tokio-worker we must not use block_in_place() either
|
||||||
|
if is_blocking() || !is_in_tokio() {
|
||||||
|
fut()
|
||||||
|
} else {
|
||||||
|
// we are in an actual tokio worker thread, block it:
|
||||||
|
tokio::task::block_in_place(move || {
|
||||||
|
let _guard = BlockingGuard::set();
|
||||||
|
fut()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Block on a future in this thread.
|
||||||
|
pub fn block_on<F: Future>(fut: F) -> F::Output {
|
||||||
|
// don't double-exit the context (tokio doesn't like that)
|
||||||
|
if is_blocking() {
|
||||||
|
block_on_local_future(fut)
|
||||||
|
} else if is_in_tokio() {
|
||||||
|
// inside a tokio worker we need to tell tokio that we're about to really block:
|
||||||
|
tokio::task::block_in_place(move || {
|
||||||
|
let _guard = BlockingGuard::set();
|
||||||
|
block_on_local_future(fut)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
|
||||||
|
// it on demand if necessary), then enter it
|
||||||
|
let _guard = BlockingGuard::set();
|
||||||
|
let _enter_guard = get_runtime().enter();
|
||||||
|
get_runtime().block_on(fut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
fn block_on_impl<F>(mut fut: F) -> F::Output
|
||||||
|
where
|
||||||
|
F: Future + Send,
|
||||||
|
F::Output: Send + 'static,
|
||||||
|
{
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
|
||||||
|
tx
|
||||||
|
.send(fut.await)
|
||||||
|
.map_err(drop)
|
||||||
|
.expect("failed to send block_on result to channel")
|
||||||
|
});
|
||||||
|
|
||||||
|
futures::executor::block_on(async move {
|
||||||
|
rx.await.expect("failed to receive block_on result from channel")
|
||||||
|
})
|
||||||
|
std::mem::forget(fut);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
|
||||||
|
/// compatibility, which will perform all the necessary tasks on-demand anyway.
|
||||||
|
pub fn main<F: Future>(fut: F) -> F::Output {
|
||||||
|
block_on(fut)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn block_on_local_future<F: Future>(fut: F) -> F::Output {
|
||||||
|
pin_mut!(fut);
|
||||||
|
|
||||||
|
let waker = Arc::new(thread::current());
|
||||||
|
let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
|
||||||
|
let waker = unsafe { Waker::from_raw(waker) };
|
||||||
|
let mut context = Context::from_waker(&waker);
|
||||||
|
loop {
|
||||||
|
match fut.as_mut().poll(&mut context) {
|
||||||
|
Poll::Ready(out) => return out,
|
||||||
|
Poll::Pending => thread::park(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
|
||||||
|
thread_waker_clone,
|
||||||
|
thread_waker_wake,
|
||||||
|
thread_waker_wake_by_ref,
|
||||||
|
thread_waker_drop,
|
||||||
|
);
|
||||||
|
|
||||||
|
fn thread_waker_clone(this: *const ()) -> RawWaker {
|
||||||
|
let this = unsafe { Arc::from_raw(this as *const Thread) };
|
||||||
|
let cloned = Arc::clone(&this);
|
||||||
|
let _ = Arc::into_raw(this);
|
||||||
|
|
||||||
|
RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn thread_waker_wake(this: *const ()) {
|
||||||
|
let this = unsafe { Arc::from_raw(this as *const Thread) };
|
||||||
|
this.unpark();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn thread_waker_wake_by_ref(this: *const ()) {
|
||||||
|
let this = unsafe { Arc::from_raw(this as *const Thread) };
|
||||||
|
this.unpark();
|
||||||
|
let _ = Arc::into_raw(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn thread_waker_drop(this: *const ()) {
|
||||||
|
let this = unsafe { Arc::from_raw(this as *const Thread) };
|
||||||
|
drop(this);
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user