start error handling

This commit is contained in:
Dietmar Maurer 2019-09-16 13:44:47 +02:00
parent d325ffe484
commit 20f9ab37c2

View File

@ -1,21 +1,31 @@
use failure::*;
use std::thread::JoinHandle;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::ffi::{CStr, CString};
use std::ptr;
use std::os::raw::{c_char, c_int, c_void};
//use futures::{future, Future, Stream};
use tokio::runtime::current_thread::{Runtime, RunError};
use tokio::runtime::current_thread::Runtime;
//#[macro_use]
use proxmox_backup::client::*;
use chrono::{Utc, TimeZone};
use chrono::{Utc, TimeZone, DateTime};
struct BackupRepository {
host: String,
store: String,
user: String,
backup_id: String,
backup_time: DateTime<Utc>,
password: String,
}
struct BackupTask {
worker: JoinHandle<Result<BackupStats, Error>>,
tx: Sender<BackupMessage>,
// runtime: Runtime,
// client: Arc<BackupClient>,
command_tx: Sender<BackupMessage>,
}
#[derive(Debug)]
@ -29,8 +39,8 @@ enum BackupMessage {
dev_id: u8,
data: *const u8,
size: u64,
callback: extern "C" fn(*mut libc::c_void),
callback_data: *mut libc::c_void,
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
},
}
@ -38,45 +48,64 @@ unsafe impl std::marker::Send for BackupMessage {} // fixme: ???
impl BackupTask {
fn new() -> Result<Self, Error> {
fn new(repo: BackupRepository) -> Result<Self, Error> {
let host = "localhost";
let user = "root@pam";
let store = "store2";
let backup_id = "99";
let verbose = false;
let (connect_tx, connect_rx) = channel(); // sync initial server connect
let backup_time = Utc.timestamp(Utc::now().timestamp(), 0);
let (tx, rx) = channel();
let (command_tx, command_rx) = channel();
let worker = std::thread::spawn(move || {
backup_worker_task(rx, host)
backup_worker_task(repo, connect_tx, command_rx)
});
/*
let client = HttpClient::new(host, user)?;
let client = runtime.block_on(
client.start_backup(store, "vm", backup_id, backup_time, verbose))?;
*/
connect_rx.recv().unwrap()?;
Ok(BackupTask {
worker,
tx,
command_tx,
})
}
}
fn backup_worker_task(rx: Receiver<BackupMessage>, host: &str) -> Result<BackupStats, Error> {
fn connect(runtime: &mut Runtime, repo: &BackupRepository) -> Result<Arc<BackupClient>, Error> {
let client = HttpClient::new(&repo.host, &repo.user, Some(repo.password.clone()))?;
let mut runtime = Runtime::new().unwrap(); // fixme
let client = runtime.block_on(
client.start_backup(&repo.store, "vm", &repo.backup_id, repo.backup_time, false))?;
Ok(client)
}
fn backup_worker_task(
repo: BackupRepository,
connect_tx: Sender<Result<(), Error>>,
command_rx: Receiver<BackupMessage>,
) -> Result<BackupStats, Error> {
let mut runtime = match Runtime::new() {
Ok(runtime) => runtime,
Err(err) => {
connect_tx.send(Err(format_err!("create runtime failed: {}", err))).unwrap();
bail!("create runtiome failed");
}
};
let client = match connect(&mut runtime, &repo) {
Ok(client) => {
connect_tx.send(Ok(())).unwrap();
}
Err(err) => {
connect_tx.send(Err(err)).unwrap();
bail!("connection failed");
}
};
drop(connect_tx); // no longer needed
let mut stats = BackupStats { written_bytes: 0 };
loop {
let msg = rx.recv()?;
let msg = command_rx.recv()?;
match msg {
BackupMessage::End => {
@ -109,32 +138,63 @@ fn backup_worker_task(rx: Receiver<BackupMessage>, host: &str) -> Result<BackupS
// The C interface
#[repr(C)]
pub struct ProxmoxBackupHandle {}
pub struct ProxmoxBackupHandle;
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_connect() -> *mut ProxmoxBackupHandle {
pub extern "C" fn proxmox_backup_free_error(ptr: * mut c_char) {
unsafe { CString::from_raw(ptr); }
}
macro_rules! raise_error_null {
($error:ident, $err:expr) => {{
let errmsg = CString::new($err.to_string()).unwrap(); // fixme
unsafe { *$error = errmsg.into_raw(); }
return ptr::null_mut();
}}
}
macro_rules! raise_error_int {
($error:ident, $err:expr) => {{
let errmsg = CString::new($err.to_string()).unwrap(); // fixme
unsafe { *$error = errmsg.into_raw(); }
return -1 as c_int;
}}
}
#[no_mangle]
pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut ProxmoxBackupHandle {
println!("Hello");
match BackupTask::new() {
let backup_time = Utc.timestamp(Utc::now().timestamp(), 0);
let repo = BackupRepository {
host: "localhost".to_owned(),
user: "root@pam".to_owned(),
store: "store2".to_owned(),
backup_id: "99".to_owned(),
password: "".to_owned(),
backup_time,
};
match BackupTask::new(repo) {
Ok(task) => {
let tmp = Box::new(task);
let test = Box::into_raw(tmp);
test as * mut ProxmoxBackupHandle
}
Err(err) => std::ptr::null_mut(),
Err(err) => raise_error_null!(error, err),
}
}
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_write_data_async(
pub extern "C" fn proxmox_backup_write_data_async(
handle: *mut ProxmoxBackupHandle,
dev_id: u8,
data: *const u8,
size: u64,
callback: extern "C" fn(*mut libc::c_void),
callback_data: *mut libc::c_void,
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
) {
let msg = BackupMessage::WriteData { dev_id, data, size , callback, callback_data };
@ -142,20 +202,20 @@ pub unsafe extern "C" fn proxmox_backup_write_data_async(
let task = handle as * mut BackupTask;
println!("write_data_async start");
let _res = (*task).tx.send(msg); // fixme: log errors
let _res = unsafe { (*task).command_tx.send(msg) }; // fixme: log errors
println!("write_data_async end");
}
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
println!("diconnect");
let task = handle as * mut BackupTask;
let mut task = Box::from_raw(task); // take ownership
let task = unsafe { Box::from_raw(task) }; // take ownership
println!("send end");
let _res = task.tx.send(BackupMessage::End); // fixme: log errors
let _res = task.command_tx.send(BackupMessage::End); // fixme: log errors
println!("try join");
match task.worker.join() {