diff --git a/src/lib.rs b/src/lib.rs index 740c9e6..d461748 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ use anyhow::{bail, format_err, Error}; use std::ffi::CString; use std::ptr; use std::os::raw::{c_uchar, c_char, c_int, c_void}; -use std::sync::{Mutex, Condvar}; +use std::sync::{Arc, Mutex, Condvar}; use proxmox::try_block; use proxmox_backup::client::BackupRepository; @@ -182,13 +182,19 @@ pub extern "C" fn proxmox_backup_new( match task { Ok(task) => { - let boxed_task = Box::new(task); + let boxed_task = Box::new(Arc::new(task)); Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle } Err(err) => raise_error_null!(error, err), } } +fn handle_to_task(handle: *mut ProxmoxBackupHandle) -> Arc { + let task = unsafe { & *(handle as *const Arc) }; + // increase reference count while we use it inside rust + task.clone() +} + /// Open connection to the backup server (sync) /// /// Returns: @@ -201,7 +207,7 @@ pub extern "C" fn proxmox_backup_connect( handle: *mut ProxmoxBackupHandle, error: *mut *mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let mut result: c_int = -1; @@ -209,7 +215,7 @@ pub extern "C" fn proxmox_backup_connect( let callback_info = got_result_condition.callback_info(&mut result, error); - task.runtime().spawn(async move { + task.runtime().spawn(async move { // do not use move here???!! let result = task.connect().await; callback_info.send_result(result); }); @@ -234,7 +240,7 @@ pub extern "C" fn proxmox_backup_connect_async( result: *mut c_int, error: *mut *mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let callback_info = CallbackPointers { callback, callback_data, error, result }; task.runtime().spawn(async move { @@ -243,7 +249,6 @@ pub extern "C" fn proxmox_backup_connect_async( }); } - /// Abort a running backup task /// /// This stops the current backup task. It is still necessary to call @@ -255,8 +260,7 @@ pub extern "C" fn proxmox_backup_abort( handle: *mut ProxmoxBackupHandle, reason: *const c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; - + let task = handle_to_task(handle); let reason = unsafe { tools::utf8_c_string_lossy_non_null(reason) }; task.abort(reason); } @@ -272,7 +276,7 @@ pub extern "C" fn proxmox_backup_register_image( incremental: bool, error: * mut * mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let mut result: c_int = -1; @@ -308,7 +312,7 @@ pub extern "C" fn proxmox_backup_register_image_async( result: *mut c_int, error: * mut * mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let callback_info = CallbackPointers { callback, callback_data, error, result }; let device_name = unsafe { tools::utf8_c_string_lossy_non_null(device_name) }; @@ -329,7 +333,7 @@ pub extern "C" fn proxmox_backup_add_config( size: u64, error: * mut * mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let mut result: c_int = -1; @@ -366,7 +370,7 @@ pub extern "C" fn proxmox_backup_add_config_async( result: *mut c_int, error: * mut * mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let callback_info = CallbackPointers { callback, callback_data, error, result }; @@ -394,7 +398,8 @@ pub extern "C" fn proxmox_backup_write_data( size: u64, error: * mut * mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); + let mut result: c_int = -1; let mut got_result_condition = GotResultCondition::new(); @@ -416,7 +421,11 @@ pub extern "C" fn proxmox_backup_write_data( /// /// Upload a chunk of data for the image. /// -/// data may be NULL in order to write the zero chunk (only allowed if size == chunk_size) +/// The data pointer may be NULL in order to write the zero chunk +/// (only allowed if size == chunk_size) +/// +/// Note: The data pointer needs to be valid until the async +/// opteration is finished. #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_backup_write_data_async( @@ -430,7 +439,7 @@ pub extern "C" fn proxmox_backup_write_data_async( result: *mut c_int, error: * mut * mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let callback_info = CallbackPointers { callback, callback_data, error, result }; let data = DataPointer(data); // fixme @@ -448,7 +457,7 @@ pub extern "C" fn proxmox_backup_close_image( dev_id: u8, error: * mut * mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let mut result: c_int = -1; @@ -479,7 +488,7 @@ pub extern "C" fn proxmox_backup_close_image_async( result: *mut c_int, error: * mut * mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = handle_to_task(handle); let callback_info = CallbackPointers { callback, callback_data, error, result }; task.runtime().spawn(async move { @@ -495,7 +504,7 @@ pub extern "C" fn proxmox_backup_finish( handle: *mut ProxmoxBackupHandle, error: * mut * mut c_char, ) -> c_int { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = unsafe { & *(handle as * const Arc) }; let mut result: c_int = -1; @@ -503,8 +512,9 @@ pub extern "C" fn proxmox_backup_finish( let callback_info = got_result_condition.callback_info(&mut result, error); + let task2 = task.clone(); task.runtime().spawn(async move { - let result = task.finish().await; + let result = task2.finish().await; callback_info.send_result(result); }); @@ -526,11 +536,12 @@ pub extern "C" fn proxmox_backup_finish_async( result: *mut c_int, error: * mut * mut c_char, ) { - let task = unsafe { &mut *(handle as * mut BackupTask) }; + let task = unsafe { & *(handle as * const Arc) }; let callback_info = CallbackPointers { callback, callback_data, error, result }; + let task2 = task.clone(); task.runtime().spawn(async move { - let result = task.finish().await; + let result = task2.finish().await; callback_info.send_result(result); }); } @@ -542,7 +553,7 @@ pub extern "C" fn proxmox_backup_finish_async( #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) { - let task = handle as * mut BackupTask; + let task = handle as * mut Arc; // fixme: why * mut ?? unsafe { Box::from_raw(task) }; // take ownership, drop(task) } diff --git a/src/worker_task.rs b/src/worker_task.rs index 99ee996..bb16f05 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -16,13 +16,12 @@ pub(crate) struct BackupTask { setup: BackupSetup, runtime: tokio::runtime::Runtime, crypt_config: Option>, - writer: Option>, - last_manifest: Option>, + writer: Mutex>>, + last_manifest: Mutex>>, registry: Arc>, // fixme Arc/Mutex??? known_chunks: Arc>>, abort: tokio::sync::broadcast::Sender<()>, - aborted: Option, // set on abort, conatins abort reason - written_bytes: u64, + aborted: Mutex>, // set on abort, conatins abort reason } impl BackupTask { @@ -57,8 +56,8 @@ impl BackupTask { let known_chunks = Arc::new(Mutex::new(HashSet::new())); Ok(Self { runtime, setup, crypt_config, abort, registry, known_chunks, - writer: None, written_bytes: 0, - last_manifest: None, aborted: None }) + writer: Mutex::new(None), last_manifest: Mutex::new(None), + aborted: Mutex::new(None) }) } pub fn runtime(&self) -> tokio::runtime::Handle { @@ -66,55 +65,59 @@ impl BackupTask { } fn check_aborted(&self) -> Result<(), Error> { - if self.aborted.is_some() { + if (*self.aborted.lock().unwrap()).is_some() { bail!("task already aborted"); } Ok(()) } - pub fn abort(&mut self, reason: String) { + pub fn abort(&self, reason: String) { let _ = self.abort.send(()); // fixme: ignore errors? - if self.aborted.is_none() { - self.aborted = Some(reason); + let mut aborted = self.aborted.lock().unwrap(); + if (*aborted).is_none() { + *aborted = Some(reason); } } - async fn _connect(&mut self) -> Result { - - let options = HttpClientOptions::new() - .fingerprint(self.setup.fingerprint.clone()) - .password(self.setup.password.clone()); - - let http = HttpClient::new(&self.setup.host, &self.setup.user, options)?; - let writer = BackupWriter::start(http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id, self.setup.backup_time, false).await?; - - let last_manifest = writer.download_previous_manifest().await; - if let Ok(last_manifest) = last_manifest { - self.last_manifest = Some(Arc::new(last_manifest)); - } - - self.writer = Some(writer); - - Ok(if self.last_manifest.is_some() { 1 } else { 0 }) - } - - pub async fn connect(&mut self) -> Result { + pub async fn connect(&self) -> Result { self.check_aborted()?; + let command_future = async { + let options = HttpClientOptions::new() + .fingerprint(self.setup.fingerprint.clone()) + .password(self.setup.password.clone()); + + let http = HttpClient::new(&self.setup.host, &self.setup.user, options)?; + let writer = BackupWriter::start( + http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id, + self.setup.backup_time, false).await?; + + let last_manifest = writer.download_previous_manifest().await; + let mut result = 0; + if let Ok(last_manifest) = last_manifest { + result = 1; + *self.last_manifest.lock().unwrap() = Some(Arc::new(last_manifest)); + } + + *self.writer.lock().unwrap() = Some(writer); + + Ok(result) + }; + let mut abort_rx = self.abort.subscribe(); - abortable_command(Self::_connect(self), abort_rx.recv()).await + abortable_command(command_future, abort_rx.recv()).await } pub async fn add_config( - &mut self, + &self, name: String, data: Vec, ) -> Result { self.check_aborted()?; - let writer = match self.writer { + let writer = match *self.writer.lock().unwrap() { Some(ref writer) => writer.clone(), None => bail!("not connected"), }; @@ -130,7 +133,7 @@ impl BackupTask { } pub async fn write_data( - &mut self, + &self, dev_id: u8, data: DataPointer, // this may be null offset: u64, @@ -139,13 +142,11 @@ impl BackupTask { self.check_aborted()?; - let writer = match self.writer { + let writer = match *self.writer.lock().unwrap() { Some(ref writer) => writer.clone(), None => bail!("not connected"), }; - self.written_bytes += size; - let command_future = write_data( writer, self.crypt_config.clone(), @@ -163,7 +164,7 @@ impl BackupTask { } pub async fn register_image( - &mut self, + &self, device_name: String, size: u64, incremental: bool, @@ -171,7 +172,7 @@ impl BackupTask { self.check_aborted()?; - let writer = match self.writer { + let writer = match *self.writer.lock().unwrap() { Some(ref writer) => writer.clone(), None => bail!("not connected"), }; @@ -179,7 +180,7 @@ impl BackupTask { let command_future = register_image( writer, self.crypt_config.clone(), - self.last_manifest.clone(), + self.last_manifest.lock().unwrap().clone(), self.registry.clone(), self.known_chunks.clone(), device_name, @@ -192,14 +193,11 @@ impl BackupTask { abortable_command(command_future, abort_rx.recv()).await } - pub async fn close_image( - &mut self, - dev_id: u8, - ) -> Result { + pub async fn close_image(&self, dev_id: u8) -> Result { self.check_aborted()?; - let writer = match self.writer { + let writer = match *self.writer.lock().unwrap() { Some(ref writer) => writer.clone(), None => bail!("not connected"), }; @@ -209,13 +207,11 @@ impl BackupTask { abortable_command(command_future, abort_rx.recv()).await } - pub async fn finish( - &mut self, - ) -> Result { + pub async fn finish(&self) -> Result { self.check_aborted()?; - let writer = match self.writer { + let writer = match *self.writer.lock().unwrap() { Some(ref writer) => writer.clone(), None => bail!("not connected"), };