make backup C api thread safe

Protect everything with a Mutex, so it is now allowed to make parallel
calls from different threads.
This commit is contained in:
Dietmar Maurer 2020-07-02 07:01:12 +02:00
parent 12a00649f7
commit c9c4a5788b
2 changed files with 78 additions and 71 deletions

View File

@ -2,7 +2,7 @@ use anyhow::{bail, format_err, Error};
use std::ffi::CString;
use std::ptr;
use std::os::raw::{c_uchar, c_char, c_int, c_void};
use std::sync::{Mutex, Condvar};
use std::sync::{Arc, Mutex, Condvar};
use proxmox::try_block;
use proxmox_backup::client::BackupRepository;
@ -182,13 +182,19 @@ pub extern "C" fn proxmox_backup_new(
match task {
Ok(task) => {
let boxed_task = Box::new(task);
let boxed_task = Box::new(Arc::new(task));
Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle
}
Err(err) => raise_error_null!(error, err),
}
}
fn handle_to_task(handle: *mut ProxmoxBackupHandle) -> Arc<BackupTask> {
let task = unsafe { & *(handle as *const Arc<BackupTask>) };
// increase reference count while we use it inside rust
task.clone()
}
/// Open connection to the backup server (sync)
///
/// Returns:
@ -201,7 +207,7 @@ pub extern "C" fn proxmox_backup_connect(
handle: *mut ProxmoxBackupHandle,
error: *mut *mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let mut result: c_int = -1;
@ -209,7 +215,7 @@ pub extern "C" fn proxmox_backup_connect(
let callback_info = got_result_condition.callback_info(&mut result, error);
task.runtime().spawn(async move {
task.runtime().spawn(async move { // do not use move here???!!
let result = task.connect().await;
callback_info.send_result(result);
});
@ -234,7 +240,7 @@ pub extern "C" fn proxmox_backup_connect_async(
result: *mut c_int,
error: *mut *mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result };
task.runtime().spawn(async move {
@ -243,7 +249,6 @@ pub extern "C" fn proxmox_backup_connect_async(
});
}
/// Abort a running backup task
///
/// This stops the current backup task. It is still necessary to call
@ -255,8 +260,7 @@ pub extern "C" fn proxmox_backup_abort(
handle: *mut ProxmoxBackupHandle,
reason: *const c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let reason = unsafe { tools::utf8_c_string_lossy_non_null(reason) };
task.abort(reason);
}
@ -272,7 +276,7 @@ pub extern "C" fn proxmox_backup_register_image(
incremental: bool,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let mut result: c_int = -1;
@ -308,7 +312,7 @@ pub extern "C" fn proxmox_backup_register_image_async(
result: *mut c_int,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result };
let device_name = unsafe { tools::utf8_c_string_lossy_non_null(device_name) };
@ -329,7 +333,7 @@ pub extern "C" fn proxmox_backup_add_config(
size: u64,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let mut result: c_int = -1;
@ -366,7 +370,7 @@ pub extern "C" fn proxmox_backup_add_config_async(
result: *mut c_int,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result };
@ -394,7 +398,8 @@ pub extern "C" fn proxmox_backup_write_data(
size: u64,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let mut result: c_int = -1;
let mut got_result_condition = GotResultCondition::new();
@ -416,7 +421,11 @@ pub extern "C" fn proxmox_backup_write_data(
///
/// Upload a chunk of data for the <dev_id> image.
///
/// data may be NULL in order to write the zero chunk (only allowed if size == chunk_size)
/// The data pointer may be NULL in order to write the zero chunk
/// (only allowed if size == chunk_size)
///
/// Note: The data pointer needs to be valid until the async
/// opteration is finished.
#[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_write_data_async(
@ -430,7 +439,7 @@ pub extern "C" fn proxmox_backup_write_data_async(
result: *mut c_int,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result };
let data = DataPointer(data); // fixme
@ -448,7 +457,7 @@ pub extern "C" fn proxmox_backup_close_image(
dev_id: u8,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let mut result: c_int = -1;
@ -479,7 +488,7 @@ pub extern "C" fn proxmox_backup_close_image_async(
result: *mut c_int,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result };
task.runtime().spawn(async move {
@ -495,7 +504,7 @@ pub extern "C" fn proxmox_backup_finish(
handle: *mut ProxmoxBackupHandle,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = unsafe { & *(handle as * const Arc<BackupTask>) };
let mut result: c_int = -1;
@ -503,8 +512,9 @@ pub extern "C" fn proxmox_backup_finish(
let callback_info = got_result_condition.callback_info(&mut result, error);
let task2 = task.clone();
task.runtime().spawn(async move {
let result = task.finish().await;
let result = task2.finish().await;
callback_info.send_result(result);
});
@ -526,11 +536,12 @@ pub extern "C" fn proxmox_backup_finish_async(
result: *mut c_int,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let task = unsafe { & *(handle as * const Arc<BackupTask>) };
let callback_info = CallbackPointers { callback, callback_data, error, result };
let task2 = task.clone();
task.runtime().spawn(async move {
let result = task.finish().await;
let result = task2.finish().await;
callback_info.send_result(result);
});
}
@ -542,7 +553,7 @@ pub extern "C" fn proxmox_backup_finish_async(
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
let task = handle as * mut BackupTask;
let task = handle as * mut Arc<BackupTask>; // fixme: why * mut ??
unsafe { Box::from_raw(task) }; // take ownership, drop(task)
}

View File

@ -16,13 +16,12 @@ pub(crate) struct BackupTask {
setup: BackupSetup,
runtime: tokio::runtime::Runtime,
crypt_config: Option<Arc<CryptConfig>>,
writer: Option<Arc<BackupWriter>>,
last_manifest: Option<Arc<BackupManifest>>,
writer: Mutex<Option<Arc<BackupWriter>>>,
last_manifest: Mutex<Option<Arc<BackupManifest>>>,
registry: Arc<Mutex<ImageRegistry>>, // fixme Arc/Mutex???
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
abort: tokio::sync::broadcast::Sender<()>,
aborted: Option<String>, // set on abort, conatins abort reason
written_bytes: u64,
aborted: Mutex<Option<String>>, // set on abort, conatins abort reason
}
impl BackupTask {
@ -57,8 +56,8 @@ impl BackupTask {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
Ok(Self { runtime, setup, crypt_config, abort, registry, known_chunks,
writer: None, written_bytes: 0,
last_manifest: None, aborted: None })
writer: Mutex::new(None), last_manifest: Mutex::new(None),
aborted: Mutex::new(None) })
}
pub fn runtime(&self) -> tokio::runtime::Handle {
@ -66,55 +65,59 @@ impl BackupTask {
}
fn check_aborted(&self) -> Result<(), Error> {
if self.aborted.is_some() {
if (*self.aborted.lock().unwrap()).is_some() {
bail!("task already aborted");
}
Ok(())
}
pub fn abort(&mut self, reason: String) {
pub fn abort(&self, reason: String) {
let _ = self.abort.send(()); // fixme: ignore errors?
if self.aborted.is_none() {
self.aborted = Some(reason);
let mut aborted = self.aborted.lock().unwrap();
if (*aborted).is_none() {
*aborted = Some(reason);
}
}
async fn _connect(&mut self) -> Result<c_int, Error> {
let options = HttpClientOptions::new()
.fingerprint(self.setup.fingerprint.clone())
.password(self.setup.password.clone());
let http = HttpClient::new(&self.setup.host, &self.setup.user, options)?;
let writer = BackupWriter::start(http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id, self.setup.backup_time, false).await?;
let last_manifest = writer.download_previous_manifest().await;
if let Ok(last_manifest) = last_manifest {
self.last_manifest = Some(Arc::new(last_manifest));
}
self.writer = Some(writer);
Ok(if self.last_manifest.is_some() { 1 } else { 0 })
}
pub async fn connect(&mut self) -> Result<c_int, Error> {
pub async fn connect(&self) -> Result<c_int, Error> {
self.check_aborted()?;
let command_future = async {
let options = HttpClientOptions::new()
.fingerprint(self.setup.fingerprint.clone())
.password(self.setup.password.clone());
let http = HttpClient::new(&self.setup.host, &self.setup.user, options)?;
let writer = BackupWriter::start(
http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id,
self.setup.backup_time, false).await?;
let last_manifest = writer.download_previous_manifest().await;
let mut result = 0;
if let Ok(last_manifest) = last_manifest {
result = 1;
*self.last_manifest.lock().unwrap() = Some(Arc::new(last_manifest));
}
*self.writer.lock().unwrap() = Some(writer);
Ok(result)
};
let mut abort_rx = self.abort.subscribe();
abortable_command(Self::_connect(self), abort_rx.recv()).await
abortable_command(command_future, abort_rx.recv()).await
}
pub async fn add_config(
&mut self,
&self,
name: String,
data: Vec<u8>,
) -> Result<c_int, Error> {
self.check_aborted()?;
let writer = match self.writer {
let writer = match *self.writer.lock().unwrap() {
Some(ref writer) => writer.clone(),
None => bail!("not connected"),
};
@ -130,7 +133,7 @@ impl BackupTask {
}
pub async fn write_data(
&mut self,
&self,
dev_id: u8,
data: DataPointer, // this may be null
offset: u64,
@ -139,13 +142,11 @@ impl BackupTask {
self.check_aborted()?;
let writer = match self.writer {
let writer = match *self.writer.lock().unwrap() {
Some(ref writer) => writer.clone(),
None => bail!("not connected"),
};
self.written_bytes += size;
let command_future = write_data(
writer,
self.crypt_config.clone(),
@ -163,7 +164,7 @@ impl BackupTask {
}
pub async fn register_image(
&mut self,
&self,
device_name: String,
size: u64,
incremental: bool,
@ -171,7 +172,7 @@ impl BackupTask {
self.check_aborted()?;
let writer = match self.writer {
let writer = match *self.writer.lock().unwrap() {
Some(ref writer) => writer.clone(),
None => bail!("not connected"),
};
@ -179,7 +180,7 @@ impl BackupTask {
let command_future = register_image(
writer,
self.crypt_config.clone(),
self.last_manifest.clone(),
self.last_manifest.lock().unwrap().clone(),
self.registry.clone(),
self.known_chunks.clone(),
device_name,
@ -192,14 +193,11 @@ impl BackupTask {
abortable_command(command_future, abort_rx.recv()).await
}
pub async fn close_image(
&mut self,
dev_id: u8,
) -> Result<c_int, Error> {
pub async fn close_image(&self, dev_id: u8) -> Result<c_int, Error> {
self.check_aborted()?;
let writer = match self.writer {
let writer = match *self.writer.lock().unwrap() {
Some(ref writer) => writer.clone(),
None => bail!("not connected"),
};
@ -209,13 +207,11 @@ impl BackupTask {
abortable_command(command_future, abort_rx.recv()).await
}
pub async fn finish(
&mut self,
) -> Result<c_int, Error> {
pub async fn finish(&self) -> Result<c_int, Error> {
self.check_aborted()?;
let writer = match self.writer {
let writer = match *self.writer.lock().unwrap() {
Some(ref writer) => writer.clone(),
None => bail!("not connected"),
};