use threadpool runtime

This commit is contained in:
Dietmar Maurer 2019-09-17 17:02:58 +02:00
parent 726708dd5d
commit 1b23a2ce54

View File

@ -1,6 +1,7 @@
use failure::*; use failure::*;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{channel, Sender, Receiver}; use std::sync::mpsc::{channel, Sender, Receiver};
use std::ffi::CString; use std::ffi::CString;
use std::ptr; use std::ptr;
@ -8,7 +9,7 @@ use std::os::raw::{c_char, c_void};
//use futures::{future, Future, Stream}; //use futures::{future, Future, Stream};
use futures::future::{self, Future, Either, FutureExt}; use futures::future::{self, Future, Either, FutureExt};
use tokio::runtime::current_thread::Runtime; use tokio::runtime::Runtime;
//#[macro_use] //#[macro_use]
use proxmox_backup::client::*; use proxmox_backup::client::*;
@ -35,20 +36,28 @@ struct BackupStats {
written_bytes: u64, written_bytes: u64,
} }
struct CallbackPointers {
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
error: * mut *mut c_char,
}
unsafe impl std::marker::Send for CallbackPointers {}
struct DataPointer (*const u8);
unsafe impl std::marker::Send for DataPointer {}
enum BackupMessage { enum BackupMessage {
End, End,
Abort, Abort,
WriteData { WriteData {
dev_id: u8, dev_id: u8,
data: *const u8, data: DataPointer,
size: u64, size: u64,
callback: extern "C" fn(*mut c_void), callback_info: CallbackPointers,
callback_data: *mut c_void,
error: * mut * mut c_char,
}, },
} }
unsafe impl std::marker::Send for BackupMessage {} // fixme: ??? //unsafe impl std::marker::Send for BackupMessage {} // fixme: ???
impl BackupTask { impl BackupTask {
@ -83,9 +92,7 @@ fn connect(runtime: &mut Runtime, repo: &BackupRepository) -> Result<Arc<BackupC
fn handle_async_command<F: 'static + Send + Future<Output=Result<(), Error>>>( fn handle_async_command<F: 'static + Send + Future<Output=Result<(), Error>>>(
command_future: F, command_future: F,
abort_future: impl 'static + Send + Future<Output=Result<(), Error>>, abort_future: impl 'static + Send + Future<Output=Result<(), Error>>,
callback: extern "C" fn(*mut c_void), callback_info: CallbackPointers,
callback_data: *mut c_void,
error: * mut * mut c_char,
) -> impl Future<Output = ()> { ) -> impl Future<Output = ()> {
futures::future::select(command_future.boxed(), abort_future.boxed()) futures::future::select(command_future.boxed(), abort_future.boxed())
@ -95,22 +102,22 @@ fn handle_async_command<F: 'static + Send + Future<Output=Result<(), Error>>>(
match result { match result {
Ok(_) => { Ok(_) => {
println!("command sucessful"); println!("command sucessful");
unsafe { *error = ptr::null_mut(); } unsafe { *(callback_info.error) = ptr::null_mut(); }
callback(callback_data); (callback_info.callback)(callback_info.callback_data);
} }
Err(err) => { Err(err) => {
println!("command error {}", err); println!("command error {}", err);
let errmsg = CString::new(format!("command error: {}", err)).unwrap(); let errmsg = CString::new(format!("command error: {}", err)).unwrap();
unsafe { *error = errmsg.into_raw(); } unsafe { *(callback_info.error) = errmsg.into_raw(); }
callback(callback_data); (callback_info.callback)(callback_info.callback_data);
} }
} }
} }
Either::Right(_) => { // aborted Either::Right(_) => { // aborted
println!("command aborted"); println!("command aborted");
let errmsg = CString::new("copmmand aborted".to_string()).unwrap(); let errmsg = CString::new("copmmand aborted".to_string()).unwrap();
unsafe { *error = errmsg.into_raw(); } unsafe { *(callback_info.error) = errmsg.into_raw(); }
callback(callback_data); (callback_info.callback)(callback_info.callback_data);
} }
} }
}) })
@ -142,8 +149,6 @@ fn backup_worker_task(
drop(connect_tx); // no longer needed drop(connect_tx); // no longer needed
let mut stats = BackupStats { written_bytes: 0 };
let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1); let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1);
let abort_rx = async move { let abort_rx = async move {
match abort_rx.recv().await { match abort_rx.recv().await {
@ -154,6 +159,9 @@ fn backup_worker_task(
let abort = BroadcastFuture::new(Box::new(abort_rx)); let abort = BroadcastFuture::new(Box::new(abort_rx));
let written_bytes = Arc::new(AtomicU64::new(0));
let written_bytes2 = written_bytes.clone();
runtime.spawn(async move { runtime.spawn(async move {
loop { loop {
@ -171,9 +179,9 @@ fn backup_worker_task(
println!("worker got end mesage"); println!("worker got end mesage");
break; break;
} }
BackupMessage::WriteData { dev_id, data, size, callback, callback_data, error } => { BackupMessage::WriteData { dev_id, data, size, callback_info } => {
//stats.written_bytes += size; written_bytes2.fetch_add(size, Ordering::SeqCst);
//println!("dev {}: write {} bytes ({})", dev_id, size, stats.written_bytes); //println!("dev {}: write {} , bytes ({})", dev_id, size, stats.written_bytes);
let command_future = async move { let command_future = async move {
@ -189,9 +197,7 @@ fn backup_worker_task(
handle_async_command( handle_async_command(
command_future, command_future,
abort.listen(), abort.listen(),
callback, callback_info,
callback_data,
error,
).await; ).await;
} }
} }
@ -200,8 +206,9 @@ fn backup_worker_task(
println!("worker end loop"); println!("worker end loop");
}); });
runtime.run()?; runtime.shutdown_on_idle();
let stats = BackupStats { written_bytes: written_bytes.fetch_add(0,Ordering::SeqCst) };
Ok(stats) Ok(stats)
} }
@ -243,7 +250,7 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox
user: "root@pam".to_owned(), user: "root@pam".to_owned(),
store: "store2".to_owned(), store: "store2".to_owned(),
backup_id: "99".to_owned(), backup_id: "99".to_owned(),
password: "".to_owned(), password: "12345".to_owned(),
backup_time, backup_time,
}; };
@ -278,7 +285,12 @@ pub extern "C" fn proxmox_backup_write_data_async(
error: * mut * mut c_char, error: * mut * mut c_char,
) { ) {
let msg = BackupMessage::WriteData { dev_id, data, size , callback, callback_data, error }; let msg = BackupMessage::WriteData {
dev_id,
data: DataPointer(data),
size,
callback_info: CallbackPointers { callback, callback_data, error },
};
let task = handle as * mut BackupTask; let task = handle as * mut BackupTask;