implement image registry, implement close image

This commit is contained in:
Dietmar Maurer 2019-09-21 10:24:24 +02:00
parent 3c46500c25
commit c2610a6183
2 changed files with 249 additions and 19 deletions

View File

@ -16,9 +16,12 @@ cbindgen = "0.9.1"
[dependencies] [dependencies]
libc = "0.2" libc = "0.2"
bytes = "0.4"
proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1" }
proxmox-backup = { path = "../proxmox-backup" } proxmox-backup = { path = "../proxmox-backup" }
chrono = "0.4" # Date and time library for Rust chrono = "0.4" # Date and time library for Rust
failure = "0.1" failure = "0.1"
futures-preview = "0.3.0-alpha" futures-preview = "0.3.0-alpha"
serde_json = "1.0"
tokio = { version = "0.2.0-alpha.4" } tokio = { version = "0.2.0-alpha.4" }

View File

@ -1,4 +1,5 @@
use failure::*; use failure::*;
use std::collections::HashSet;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
@ -7,11 +8,12 @@ use std::ffi::{CStr, CString};
use std::ptr; use std::ptr;
use std::os::raw::{c_char, c_int, c_void}; use std::os::raw::{c_char, c_int, c_void};
//use futures::{future, Future, Stream}; use serde_json::{json, Value};
use futures::future::{self, Future, Either, FutureExt}; use futures::future::{self, Future, Either, FutureExt};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
//#[macro_use] //#[macro_use]
use proxmox_backup::backup::*;
use proxmox_backup::client::*; use proxmox_backup::client::*;
use proxmox_backup::tools::BroadcastFuture; use proxmox_backup::tools::BroadcastFuture;
@ -24,6 +26,7 @@ struct BackupRepository {
backup_id: String, backup_id: String,
backup_time: DateTime<Utc>, backup_time: DateTime<Utc>,
password: String, password: String,
crypt_config: Option<Arc<CryptConfig>>,
} }
struct BackupTask { struct BackupTask {
@ -51,46 +54,217 @@ enum BackupMessage {
End, End,
Abort, Abort,
RegisterImage { RegisterImage {
device_name: CString, device_name: String,
size: u64, size: u64,
result_channel: Arc<Mutex<Sender<Result<u8, Error>>>>, result_channel: Arc<Mutex<Sender<Result<u8, Error>>>>,
}, },
CloseImage {
dev_id: u8,
callback_info: CallbackPointers,
},
WriteData { WriteData {
dev_id: u8, dev_id: u8,
data: DataPointer, data: DataPointer,
offset: u64,
size: u64, size: u64,
callback_info: CallbackPointers, callback_info: CallbackPointers,
}, },
} }
struct ImageUploadInfo {
wid: u64,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
digest_list: Vec<String>,
offset_list: Vec<u64>,
size: u64,
written: u64,
chunk_count: u64,
}
struct ImageRegistry {
upload_info: Vec<ImageUploadInfo>,
}
impl ImageRegistry {
fn new() -> Self {
Self {
upload_info: Vec::new(),
}
}
fn register(&mut self, info: ImageUploadInfo) -> Result<u8, Error> {
let dev_id = self.upload_info.len();
if dev_id > 255 {
bail!("register image faild - to many images (limit is 255)");
}
self.upload_info.push(info);
Ok(dev_id as u8)
}
fn lookup(&mut self, dev_id: u8) -> Result<&mut ImageUploadInfo, Error> {
if dev_id as usize >= self.upload_info.len() {
bail!("image lookup failed for dev_id = {}", dev_id);
}
Ok(&mut self.upload_info[dev_id as usize])
}
}
async fn register_image( async fn register_image(
client: Arc<BackupClient>, client: Arc<BackupClient>,
device_name: CString, crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<ImageRegistry>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
device_name: String,
size: u64, size: u64,
) -> Result<u8, Error> { ) -> Result<u8, Error> {
println!("register image {} size {}", device_name.to_string_lossy(), size); println!("register image {} size {}", device_name, size);
println!("Delay test"); let archive_name = format!("{}.img.fidx", device_name);
tokio::timer::delay(std::time::Instant::now() + std::time::Duration::new(5, 0)).await;
println!("Delay end");
bail!("test"); client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?;
println!("register image download chunk list OK");
Ok(2) let param = json!({ "archive-name": archive_name , "size": size});
let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap();
let info = ImageUploadInfo {
wid,
known_chunks,
size,
digest_list: Vec::new(),
offset_list: Vec::new(),
written: 0,
chunk_count: 0,
};
let mut guard = registry.lock().unwrap();
guard.register(info)
}
async fn close_image(
client: Arc<BackupClient>,
registry: Arc<Mutex<ImageRegistry>>,
dev_id: u8,
) -> Result<(), Error> {
println!("close image {}", dev_id);
let (wid, written, chunk_count, append_list) = {
let mut guard = registry.lock().unwrap();
let mut info = guard.lookup(dev_id)?;
let append_list = if info.digest_list.len() > 0 {
let param = json!({ "wid": info.wid, "digest-list": info.digest_list, "offset-list": info.offset_list });
let param_data = param.to_string().as_bytes().to_vec();
info.digest_list.truncate(0);
info.offset_list.truncate(0);
Some(param_data)
} else {
None
};
(info.wid, info.written, info.chunk_count, append_list)
};
if let Some(data) = append_list {
client.upload_put("fixed_index", None, "application/json", data).await?;
}
let param = json!({
"wid": wid ,
"chunk-count": chunk_count,
"size": written,
});
let _value = client.post("fixed_close", Some(param)).await?;
Ok(())
} }
async fn write_data( async fn write_data(
client: Arc<BackupClient>, client: Arc<BackupClient>,
registry: Arc<Mutex<ImageRegistry>>,
dev_id: u8, dev_id: u8,
_data: DataPointer, data: DataPointer,
offset: u64,
size: u64, size: u64,
) -> Result<(), Error> { ) -> Result<(), Error> {
println!("dev {}: write {}", dev_id, size); println!("dev {}: write {}", dev_id, size);
//println!("Delay test"); if size > 4*1024*1024 {
//tokio::timer::delay(std::time::Instant::now() + std::time::Duration::new(2, 0)).await; bail!("write_data: got unexpected chunk size {}", size);
//println!("Delay end"); }
let (wid, known_chunks) = {
let mut guard = registry.lock().unwrap();
let info = guard.lookup(dev_id)?;
(info.wid, info.known_chunks.clone())
};
println!("write_data 1 wid = {}, {:p}", wid, data.0);
let bytes = if data.0 == ptr::null() {
let mut bytes = bytes::BytesMut::new();
bytes.resize(size as usize, 0u8); // fixme: avoid writing zero blocks
bytes
} else {
let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) };
bytes::BytesMut::from(data) // fixme: howto avoid copying data here?
};
let crypt_config: Option<Arc<CryptConfig>> = None;
let mut chunk_builder = DataChunkBuilder::new(bytes.as_ref()) // fixme: use data directly
.compress(true);
if let Some(ref crypt_config) = crypt_config {
chunk_builder = chunk_builder.crypt_config(crypt_config);
}
let digest = chunk_builder.digest();
let digest_str = proxmox::tools::digest_to_hex(digest);
// fixme: handle known chunks
let chunk = chunk_builder.build()?;
let chunk_data = chunk.into_raw();
let param = json!({
"wid": wid,
"digest": digest_str,
"size": size,
"encoded-size": chunk_data.len(),
});
client.upload_post("fixed_chunk", Some(param), "application/octet-stream", chunk_data).await?;
let append_list = {
let mut guard = registry.lock().unwrap();
let mut info = guard.lookup(dev_id)?;
info.written += size;
info.chunk_count += 1;
info.digest_list.push(digest_str);
info.offset_list.push(offset);
if info.digest_list.len() >= 128 {
let param = json!({ "wid": wid, "digest-list": info.digest_list, "offset-list": info.offset_list });
let param_data = param.to_string().as_bytes().to_vec();
info.digest_list.truncate(0);
info.offset_list.truncate(0);
Some(param_data)
} else {
None
}
};
if let Some(data) = append_list {
client.upload_put("fixed_index", None, "application/json", data).await?;
}
println!("upload chunk sucessful");
Ok(()) Ok(())
} }
@ -206,8 +380,15 @@ fn backup_worker_task(
let written_bytes = Arc::new(AtomicU64::new(0)); let written_bytes = Arc::new(AtomicU64::new(0));
let written_bytes2 = written_bytes.clone(); let written_bytes2 = written_bytes.clone();
//let block_size = 4*1024*1024;
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let crypt_config = repo.crypt_config;
runtime.spawn(async move { runtime.spawn(async move {
let registry = Arc::new(Mutex::new(ImageRegistry::new()));
loop { loop {
let msg = command_rx.recv().unwrap(); // todo: should be blocking let msg = command_rx.recv().unwrap(); // todo: should be blocking
@ -224,14 +405,28 @@ fn backup_worker_task(
break; break;
} }
BackupMessage::RegisterImage { device_name, size, result_channel } => { BackupMessage::RegisterImage { device_name, size, result_channel } => {
let res = register_image(client.clone(), device_name, size).await; // fixme : errors let res = register_image(
client.clone(),
crypt_config.clone(),
registry.clone(),
known_chunks.clone(),
device_name,
size
).await;
let _ = result_channel.lock().unwrap().send(res); let _ = result_channel.lock().unwrap().send(res);
} }
BackupMessage::WriteData { dev_id, data, size, callback_info } => { BackupMessage::CloseImage { dev_id, callback_info } => {
handle_async_command(
close_image(client.clone(), registry.clone(), dev_id),
abort.listen(),
callback_info,
).await;
}
BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => {
written_bytes2.fetch_add(size, Ordering::SeqCst); written_bytes2.fetch_add(size, Ordering::SeqCst);
handle_async_command( handle_async_command(
write_data(client.clone(), dev_id, data, size), write_data(client.clone(), registry.clone(), dev_id, data, offset, size),
abort.listen(), abort.listen(),
callback_info, callback_info,
).await; ).await;
@ -281,6 +476,8 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox
let backup_time = Utc.timestamp(Utc::now().timestamp(), 0); let backup_time = Utc.timestamp(Utc::now().timestamp(), 0);
let crypt_config: Option<Arc<CryptConfig>> = None;
let repo = BackupRepository { let repo = BackupRepository {
host: "localhost".to_owned(), host: "localhost".to_owned(),
user: "root@pam".to_owned(), user: "root@pam".to_owned(),
@ -288,6 +485,7 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox
backup_id: "99".to_owned(), backup_id: "99".to_owned(),
password: "12345".to_owned(), password: "12345".to_owned(),
backup_time, backup_time,
crypt_config,
}; };
match BackupTask::new(repo) { match BackupTask::new(repo) {
@ -316,7 +514,7 @@ pub extern "C" fn proxmox_backup_abort(
#[no_mangle] #[no_mangle]
pub extern "C" fn proxmox_backup_register_image( pub extern "C" fn proxmox_backup_register_image(
handle: *mut ProxmoxBackupHandle, handle: *mut ProxmoxBackupHandle,
device_name: *const c_char, device_name: *const c_char, // expect utf8 here
size: u64, size: u64,
error: * mut * mut c_char, error: * mut * mut c_char,
) -> c_int { ) -> c_int {
@ -326,7 +524,7 @@ pub extern "C" fn proxmox_backup_register_image(
raise_error_int!(error, "task already aborted"); raise_error_int!(error, "task already aborted");
} }
let device_name = unsafe { CStr::from_ptr(device_name).to_owned() }; let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
let (result_sender, result_receiver) = channel(); let (result_sender, result_receiver) = channel();
@ -352,6 +550,7 @@ pub extern "C" fn proxmox_backup_write_data_async(
handle: *mut ProxmoxBackupHandle, handle: *mut ProxmoxBackupHandle,
dev_id: u8, dev_id: u8,
data: *const u8, data: *const u8,
offset: u64,
size: u64, size: u64,
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
@ -369,6 +568,7 @@ pub extern "C" fn proxmox_backup_write_data_async(
let msg = BackupMessage::WriteData { let msg = BackupMessage::WriteData {
dev_id, dev_id,
data: DataPointer(data), data: DataPointer(data),
offset,
size, size,
callback_info: CallbackPointers { callback, callback_data, error }, callback_info: CallbackPointers { callback, callback_data, error },
}; };
@ -378,6 +578,33 @@ pub extern "C" fn proxmox_backup_write_data_async(
println!("write_data_async end"); println!("write_data_async end");
} }
#[no_mangle]
pub extern "C" fn proxmox_backup_close_image_async(
handle: *mut ProxmoxBackupHandle,
dev_id: u8,
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
error: * mut * mut c_char,
) {
let task = unsafe { &mut *(handle as * mut BackupTask) };
if let Some(_reason) = &task.aborted {
let errmsg = CString::new("task already aborted".to_string()).unwrap();
unsafe { *error = errmsg.into_raw(); }
callback(callback_data);
return;
}
let msg = BackupMessage::CloseImage {
dev_id,
callback_info: CallbackPointers { callback, callback_data, error },
};
println!("close_image_async start");
let _res = task.command_tx.send(msg); // fixme: log errors
println!("close_image_async end");
}
#[no_mangle] #[no_mangle]
pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) { pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {