proxmox-backup-qemu/src/lib.rs
2019-09-15 09:20:08 +02:00

179 lines
4.2 KiB
Rust

use failure::*;
use std::thread::JoinHandle;
use std::sync::mpsc::{channel, Sender, Receiver};
//use futures::{future, Future, Stream};
use tokio::runtime::current_thread::{Runtime, RunError};
//#[macro_use]
use proxmox_backup::client::*;
use chrono::{Utc, TimeZone};
struct BackupTask {
worker: JoinHandle<Result<BackupStats, Error>>,
tx: Sender<BackupMessage>,
// runtime: Runtime,
// client: Arc<BackupClient>,
}
#[derive(Debug)]
struct BackupStats {
written_bytes: u64,
}
enum BackupMessage {
End,
WriteData {
dev_id: u8,
data: *const u8,
size: u64,
callback: extern "C" fn(*mut libc::c_void),
callback_data: *mut libc::c_void,
},
}
unsafe impl std::marker::Send for BackupMessage {} // fixme: ???
impl BackupTask {
fn new() -> Result<Self, Error> {
let host = "localhost";
let user = "root@pam";
let store = "store2";
let backup_id = "99";
let verbose = false;
let backup_time = Utc.timestamp(Utc::now().timestamp(), 0);
let (tx, rx) = channel();
let worker = std::thread::spawn(move || {
backup_worker_task(rx, host)
});
/*
let client = HttpClient::new(host, user)?;
let client = runtime.block_on(
client.start_backup(store, "vm", backup_id, backup_time, verbose))?;
*/
Ok(BackupTask {
worker,
tx,
})
}
}
fn backup_worker_task(rx: Receiver<BackupMessage>, host: &str) -> Result<BackupStats, Error> {
let mut runtime = Runtime::new().unwrap(); // fixme
let mut stats = BackupStats { written_bytes: 0 };
loop {
let msg = rx.recv()?;
match msg {
BackupMessage::End => {
println!("worker got end mesage");
break;
}
BackupMessage::WriteData { dev_id, data, size, callback, callback_data } => {
stats.written_bytes += size;
println!("dev {}: write {} bytes ({})", dev_id, size, stats.written_bytes);
runtime.block_on(async move {
//println!("Delay test");
//tokio::timer::delay(std::time::Instant::now() + std::time::Duration::new(1, 0)).await;
//println!("Delay end");
// fixme: error handling
callback(callback_data);
});
}
}
}
println!("worker end loop");
// runtime.run()
Ok(stats)
}
// The C interface
#[repr(C)]
pub struct ProxmoxBackupHandle {}
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_connect() -> *mut ProxmoxBackupHandle {
println!("Hello");
match BackupTask::new() {
Ok(task) => {
let tmp = Box::new(task);
let test = Box::into_raw(tmp);
test as * mut ProxmoxBackupHandle
}
Err(err) => std::ptr::null_mut(),
}
}
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_write_data_async(
handle: *mut ProxmoxBackupHandle,
dev_id: u8,
data: *const u8,
size: u64,
callback: extern "C" fn(*mut libc::c_void),
callback_data: *mut libc::c_void,
) {
let msg = BackupMessage::WriteData { dev_id, data, size , callback, callback_data };
let task = handle as * mut BackupTask;
println!("write_data_async start");
let _res = (*task).tx.send(msg); // fixme: log errors
println!("write_data_async end");
}
#[no_mangle]
pub unsafe extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
println!("diconnect");
let task = handle as * mut BackupTask;
let mut task = Box::from_raw(task); // take ownership
println!("send end");
let _res = task.tx.send(BackupMessage::End); // fixme: log errors
println!("try join");
match task.worker.join() {
Ok(result) => {
match result {
Ok(stats) => {
println!("worker finished {:?}", stats);
}
Err(err) => {
println!("worker finished with error: {:?}", err);
}
}
}
Err(err) => {
println!("worker paniced with error: {:?}", err);
}
}
//drop(task);
}