From e44ea5eaadb415ab3cf3cff749c743dac6bd500b Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 25 Oct 2019 06:36:59 +0200 Subject: [PATCH] Add pointer to integer return value for all async functions --- src/capi_types.rs | 24 ++++++++++++++++++------ src/commands.rs | 13 +++++++------ src/lib.rs | 38 +++++++++++++++++--------------------- src/worker_task.rs | 5 +++-- 4 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/capi_types.rs b/src/capi_types.rs index 035865a..ad17421 100644 --- a/src/capi_types.rs +++ b/src/capi_types.rs @@ -1,26 +1,38 @@ use failure::*; -use std::os::raw::{c_char, c_void}; +use std::os::raw::{c_char, c_void, c_int}; use std::sync::{Mutex, Arc, mpsc::Sender }; use std::ffi::CString; pub(crate) struct CallbackPointers { pub callback: extern "C" fn(*mut c_void), pub callback_data: *mut c_void, - pub error: * mut *mut c_char, + pub result: *mut c_int, + pub error: *mut *mut c_char, } unsafe impl std::marker::Send for CallbackPointers {} impl CallbackPointers { - pub fn send_result(self, result: Result<(), Error>) { + pub fn send_result(self, result: Result) { match result { - Ok(_) => { - unsafe { *(self.error) = std::ptr::null_mut(); } + Ok(ret) => { + unsafe { + if self.result != std::ptr::null_mut() { + *(self.result) = ret; + } + } (self.callback)(self.callback_data); } Err(err) => { let errmsg = CString::new(format!("command error: {}", err)).unwrap(); - unsafe { *(self.error) = errmsg.into_raw(); } + unsafe { + if self.result != std::ptr::null_mut() { + *(self.result) = -1; + } + if self.error != std::ptr::null_mut() { + *(self.error) = errmsg.into_raw(); + } + } (self.callback)(self.callback_data); } } diff --git a/src/commands.rs b/src/commands.rs index 3ab84cc..dc8bcc5 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -2,6 +2,7 @@ use failure::*; use std::collections::HashSet; use std::sync::{Mutex, Arc}; use std::ptr; +use std::os::raw::c_int; use futures::future::{Future, TryFutureExt}; use serde_json::{json, Value}; @@ -177,7 +178,7 @@ pub(crate) async fn close_image( client: Arc, registry: Arc>, dev_id: u8, -) -> Result<(), Error> { +) -> Result { println!("close image {}", dev_id); @@ -216,7 +217,7 @@ pub(crate) async fn close_image( "csum": proxmox::tools::digest_to_hex(&upload_result.csum), })); - Ok(()) + Ok(0) } pub(crate) async fn write_data( @@ -229,7 +230,7 @@ pub(crate) async fn write_data( offset: u64, size: u64, // actual data size chunk_size: u64, // expected data size -) -> Result<(), Error> { +) -> Result { println!("dev {}: write {} {}", dev_id, offset, size); @@ -346,7 +347,7 @@ pub(crate) async fn write_data( println!("upload chunk sucessful"); - Ok(()) + Ok(size as c_int) } pub(crate) async fn finish_backup( @@ -354,7 +355,7 @@ pub(crate) async fn finish_backup( crypt_config: Option>, registry: Arc>, setup: BackupSetup, -) -> Result<(), Error> { +) -> Result { println!("call finish"); @@ -378,5 +379,5 @@ pub(crate) async fn finish_backup( client.finish().await?; - Ok(()) + Ok(0) } diff --git a/src/lib.rs b/src/lib.rs index 5c6c813..e310268 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,7 +51,7 @@ macro_rules! raise_error_int { }} } -/// Create new instance +/// Create a new instance #[no_mangle] pub extern "C" fn proxmox_backup_new( repo: *const c_char, @@ -119,19 +119,20 @@ pub extern "C" fn proxmox_backup_connect_async( handle: *mut ProxmoxBackupHandle, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, - error: * mut * mut c_char, + result: *mut c_int, + error: *mut *mut c_char, ) { let task = unsafe { &mut *(handle as * mut BackupTask) }; if let Some(_reason) = &task.aborted { let errmsg = CString::new("task already aborted".to_string()).unwrap(); - unsafe { *error = errmsg.into_raw(); } + unsafe { *result = -1; *error = errmsg.into_raw(); } callback(callback_data); return; } let msg = BackupMessage::Connect { - callback_info: CallbackPointers { callback, callback_data, error }, + callback_info: CallbackPointers { callback, callback_data, error, result }, }; println!("connect_async start"); @@ -252,14 +253,14 @@ pub extern "C" fn proxmox_backup_write_data_async( size: u64, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, + result: *mut c_int, error: * mut * mut c_char, ) { let task = unsafe { &mut *(handle as * mut BackupTask) }; + let callback_info = CallbackPointers { callback, callback_data, error, result }; if let Some(_reason) = &task.aborted { - let errmsg = CString::new("task already aborted".to_string()).unwrap(); - unsafe { *error = errmsg.into_raw(); } - callback(callback_data); + callback_info.send_result(Err(format_err!("task already aborted"))); return; } @@ -268,7 +269,7 @@ pub extern "C" fn proxmox_backup_write_data_async( data: DataPointer(data), offset, size, - callback_info: CallbackPointers { callback, callback_data, error }, + callback_info, }; println!("write_data_async start"); @@ -285,21 +286,18 @@ pub extern "C" fn proxmox_backup_close_image_async( dev_id: u8, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, + result: *mut c_int, error: * mut * mut c_char, ) { let task = unsafe { &mut *(handle as * mut BackupTask) }; + let callback_info = CallbackPointers { callback, callback_data, error, result }; if let Some(_reason) = &task.aborted { - let errmsg = CString::new("task already aborted".to_string()).unwrap(); - unsafe { *error = errmsg.into_raw(); } - callback(callback_data); + callback_info.send_result(Err(format_err!("task already aborted"))); return; } - let msg = BackupMessage::CloseImage { - dev_id, - callback_info: CallbackPointers { callback, callback_data, error }, - }; + let msg = BackupMessage::CloseImage { dev_id, callback_info }; println!("close_image_async start"); let _res = task.command_tx.send(msg); // fixme: log errors @@ -315,20 +313,18 @@ pub extern "C" fn proxmox_backup_finish_async( handle: *mut ProxmoxBackupHandle, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, + result: *mut c_int, error: * mut * mut c_char, ) { let task = unsafe { &mut *(handle as * mut BackupTask) }; + let callback_info = CallbackPointers { callback, callback_data, error, result }; if let Some(_reason) = &task.aborted { - let errmsg = CString::new("task already aborted".to_string()).unwrap(); - unsafe { *error = errmsg.into_raw(); } - callback(callback_data); + callback_info.send_result(Err(format_err!("task already aborted"))); return; } - let msg = BackupMessage::Finish { - callback_info: CallbackPointers { callback, callback_data, error }, - }; + let msg = BackupMessage::Finish { callback_info }; println!("finish_async start"); let _res = task.command_tx.send(msg); // fixme: log errors diff --git a/src/worker_task.rs b/src/worker_task.rs index 67fee3e..6d16a2c 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -4,6 +4,7 @@ use std::thread::JoinHandle; use std::sync::{Mutex, Arc}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::{channel, Sender, Receiver}; +use std::os::raw::c_int; use futures::future::{Future, Either, FutureExt}; @@ -59,7 +60,7 @@ impl BackupTask { } } -fn handle_async_command>>( +fn handle_async_command>>( command_future: F, abort_future: impl 'static + Send + Future>, callback_info: CallbackPointers, @@ -132,7 +133,7 @@ fn backup_worker_task( BackupMessage::Connect { callback_info } => { client = match setup.connect().await { Ok(client) => { - callback_info.send_result(Ok(())); + callback_info.send_result(Ok(0)); Some(client) } Err(err) => {