From 1b23a2ce545b12fca8da914364cfb199b8fdbfe2 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 17 Sep 2019 17:02:58 +0200 Subject: [PATCH] use threadpool runtime --- src/lib.rs | 64 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 83e1f5b..95f1c05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use failure::*; use std::thread::JoinHandle; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::{channel, Sender, Receiver}; use std::ffi::CString; use std::ptr; @@ -8,7 +9,7 @@ use std::os::raw::{c_char, c_void}; //use futures::{future, Future, Stream}; use futures::future::{self, Future, Either, FutureExt}; -use tokio::runtime::current_thread::Runtime; +use tokio::runtime::Runtime; //#[macro_use] use proxmox_backup::client::*; @@ -35,20 +36,28 @@ struct BackupStats { written_bytes: u64, } +struct CallbackPointers { + callback: extern "C" fn(*mut c_void), + callback_data: *mut c_void, + error: * mut *mut c_char, +} +unsafe impl std::marker::Send for CallbackPointers {} + +struct DataPointer (*const u8); +unsafe impl std::marker::Send for DataPointer {} + enum BackupMessage { End, Abort, WriteData { dev_id: u8, - data: *const u8, + data: DataPointer, size: u64, - callback: extern "C" fn(*mut c_void), - callback_data: *mut c_void, - error: * mut * mut c_char, + callback_info: CallbackPointers, }, } -unsafe impl std::marker::Send for BackupMessage {} // fixme: ??? +//unsafe impl std::marker::Send for BackupMessage {} // fixme: ??? impl BackupTask { @@ -83,9 +92,7 @@ fn connect(runtime: &mut Runtime, repo: &BackupRepository) -> Result>>( command_future: F, abort_future: impl 'static + Send + Future>, - callback: extern "C" fn(*mut c_void), - callback_data: *mut c_void, - error: * mut * mut c_char, + callback_info: CallbackPointers, ) -> impl Future { futures::future::select(command_future.boxed(), abort_future.boxed()) @@ -95,22 +102,22 @@ fn handle_async_command>>( match result { Ok(_) => { println!("command sucessful"); - unsafe { *error = ptr::null_mut(); } - callback(callback_data); + unsafe { *(callback_info.error) = ptr::null_mut(); } + (callback_info.callback)(callback_info.callback_data); } Err(err) => { println!("command error {}", err); let errmsg = CString::new(format!("command error: {}", err)).unwrap(); - unsafe { *error = errmsg.into_raw(); } - callback(callback_data); + unsafe { *(callback_info.error) = errmsg.into_raw(); } + (callback_info.callback)(callback_info.callback_data); } } } Either::Right(_) => { // aborted println!("command aborted"); let errmsg = CString::new("copmmand aborted".to_string()).unwrap(); - unsafe { *error = errmsg.into_raw(); } - callback(callback_data); + unsafe { *(callback_info.error) = errmsg.into_raw(); } + (callback_info.callback)(callback_info.callback_data); } } }) @@ -142,8 +149,6 @@ fn backup_worker_task( drop(connect_tx); // no longer needed - let mut stats = BackupStats { written_bytes: 0 }; - let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1); let abort_rx = async move { match abort_rx.recv().await { @@ -154,6 +159,9 @@ fn backup_worker_task( let abort = BroadcastFuture::new(Box::new(abort_rx)); + let written_bytes = Arc::new(AtomicU64::new(0)); + let written_bytes2 = written_bytes.clone(); + runtime.spawn(async move { loop { @@ -171,9 +179,9 @@ fn backup_worker_task( println!("worker got end mesage"); break; } - BackupMessage::WriteData { dev_id, data, size, callback, callback_data, error } => { - //stats.written_bytes += size; - //println!("dev {}: write {} bytes ({})", dev_id, size, stats.written_bytes); + BackupMessage::WriteData { dev_id, data, size, callback_info } => { + written_bytes2.fetch_add(size, Ordering::SeqCst); + //println!("dev {}: write {} , bytes ({})", dev_id, size, stats.written_bytes); let command_future = async move { @@ -189,9 +197,7 @@ fn backup_worker_task( handle_async_command( command_future, abort.listen(), - callback, - callback_data, - error, + callback_info, ).await; } } @@ -200,8 +206,9 @@ fn backup_worker_task( println!("worker end loop"); }); - runtime.run()?; + runtime.shutdown_on_idle(); + let stats = BackupStats { written_bytes: written_bytes.fetch_add(0,Ordering::SeqCst) }; Ok(stats) } @@ -243,7 +250,7 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox user: "root@pam".to_owned(), store: "store2".to_owned(), backup_id: "99".to_owned(), - password: "".to_owned(), + password: "12345".to_owned(), backup_time, }; @@ -278,7 +285,12 @@ pub extern "C" fn proxmox_backup_write_data_async( error: * mut * mut c_char, ) { - let msg = BackupMessage::WriteData { dev_id, data, size , callback, callback_data, error }; + let msg = BackupMessage::WriteData { + dev_id, + data: DataPointer(data), + size, + callback_info: CallbackPointers { callback, callback_data, error }, + }; let task = handle as * mut BackupTask;