From c2610a61838c2f9102f1ebc0fe93fda3d6fade7b Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 21 Sep 2019 10:24:24 +0200 Subject: [PATCH] implement image registry, implement close image --- Cargo.toml | 3 + src/lib.rs | 265 +++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 249 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 35eba03..0154622 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,12 @@ cbindgen = "0.9.1" [dependencies] libc = "0.2" +bytes = "0.4" +proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1" } proxmox-backup = { path = "../proxmox-backup" } chrono = "0.4" # Date and time library for Rust failure = "0.1" futures-preview = "0.3.0-alpha" +serde_json = "1.0" tokio = { version = "0.2.0-alpha.4" } diff --git a/src/lib.rs b/src/lib.rs index cbd5653..e68bfae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ use failure::*; +use std::collections::HashSet; use std::thread::JoinHandle; use std::sync::{Mutex, Arc}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -7,11 +8,12 @@ use std::ffi::{CStr, CString}; use std::ptr; use std::os::raw::{c_char, c_int, c_void}; -//use futures::{future, Future, Stream}; +use serde_json::{json, Value}; use futures::future::{self, Future, Either, FutureExt}; use tokio::runtime::Runtime; //#[macro_use] +use proxmox_backup::backup::*; use proxmox_backup::client::*; use proxmox_backup::tools::BroadcastFuture; @@ -24,6 +26,7 @@ struct BackupRepository { backup_id: String, backup_time: DateTime, password: String, + crypt_config: Option>, } struct BackupTask { @@ -51,46 +54,217 @@ enum BackupMessage { End, Abort, RegisterImage { - device_name: CString, + device_name: String, size: u64, result_channel: Arc>>>, }, + CloseImage { + dev_id: u8, + callback_info: CallbackPointers, + }, WriteData { dev_id: u8, data: DataPointer, + offset: u64, size: u64, callback_info: CallbackPointers, }, } +struct ImageUploadInfo { + wid: u64, + known_chunks: Arc>>, + digest_list: Vec, + offset_list: Vec, + size: u64, + written: u64, + chunk_count: u64, +} + +struct ImageRegistry { + upload_info: Vec, +} + +impl ImageRegistry { + + fn new() -> Self { + Self { + upload_info: Vec::new(), + } + } + + fn register(&mut self, info: ImageUploadInfo) -> Result { + let dev_id = self.upload_info.len(); + if dev_id > 255 { + bail!("register image faild - to many images (limit is 255)"); + } + self.upload_info.push(info); + Ok(dev_id as u8) + } + + fn lookup(&mut self, dev_id: u8) -> Result<&mut ImageUploadInfo, Error> { + if dev_id as usize >= self.upload_info.len() { + bail!("image lookup failed for dev_id = {}", dev_id); + } + Ok(&mut self.upload_info[dev_id as usize]) + } +} + async fn register_image( client: Arc, - device_name: CString, + crypt_config: Option>, + registry: Arc>, + known_chunks: Arc>>, + device_name: String, size: u64, ) -> Result { - println!("register image {} size {}", device_name.to_string_lossy(), size); + println!("register image {} size {}", device_name, size); - println!("Delay test"); - tokio::timer::delay(std::time::Instant::now() + std::time::Duration::new(5, 0)).await; - println!("Delay end"); + let archive_name = format!("{}.img.fidx", device_name); - bail!("test"); + client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?; + println!("register image download chunk list OK"); - Ok(2) + let param = json!({ "archive-name": archive_name , "size": size}); + let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap(); + + let info = ImageUploadInfo { + wid, + known_chunks, + size, + digest_list: Vec::new(), + offset_list: Vec::new(), + written: 0, + chunk_count: 0, + }; + + let mut guard = registry.lock().unwrap(); + guard.register(info) +} + +async fn close_image( + client: Arc, + registry: Arc>, + dev_id: u8, +) -> Result<(), Error> { + + println!("close image {}", dev_id); + + let (wid, written, chunk_count, append_list) = { + let mut guard = registry.lock().unwrap(); + let mut info = guard.lookup(dev_id)?; + + let append_list = if info.digest_list.len() > 0 { + let param = json!({ "wid": info.wid, "digest-list": info.digest_list, "offset-list": info.offset_list }); + let param_data = param.to_string().as_bytes().to_vec(); + info.digest_list.truncate(0); + info.offset_list.truncate(0); + Some(param_data) + } else { + None + }; + + (info.wid, info.written, info.chunk_count, append_list) + }; + + if let Some(data) = append_list { + client.upload_put("fixed_index", None, "application/json", data).await?; + } + + let param = json!({ + "wid": wid , + "chunk-count": chunk_count, + "size": written, + }); + + let _value = client.post("fixed_close", Some(param)).await?; + + Ok(()) } async fn write_data( client: Arc, + registry: Arc>, dev_id: u8, - _data: DataPointer, + data: DataPointer, + offset: u64, size: u64, ) -> Result<(), Error> { println!("dev {}: write {}", dev_id, size); - //println!("Delay test"); - //tokio::timer::delay(std::time::Instant::now() + std::time::Duration::new(2, 0)).await; - //println!("Delay end"); + if size > 4*1024*1024 { + bail!("write_data: got unexpected chunk size {}", size); + } + + let (wid, known_chunks) = { + let mut guard = registry.lock().unwrap(); + let info = guard.lookup(dev_id)?; + (info.wid, info.known_chunks.clone()) + }; + + println!("write_data 1 wid = {}, {:p}", wid, data.0); + + let bytes = if data.0 == ptr::null() { + let mut bytes = bytes::BytesMut::new(); + bytes.resize(size as usize, 0u8); // fixme: avoid writing zero blocks + bytes + } else { + let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) }; + bytes::BytesMut::from(data) // fixme: howto avoid copying data here? + }; + + let crypt_config: Option> = None; + + let mut chunk_builder = DataChunkBuilder::new(bytes.as_ref()) // fixme: use data directly + .compress(true); + + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } + + let digest = chunk_builder.digest(); + let digest_str = proxmox::tools::digest_to_hex(digest); + + // fixme: handle known chunks + + let chunk = chunk_builder.build()?; + let chunk_data = chunk.into_raw(); + + let param = json!({ + "wid": wid, + "digest": digest_str, + "size": size, + "encoded-size": chunk_data.len(), + }); + + client.upload_post("fixed_chunk", Some(param), "application/octet-stream", chunk_data).await?; + + let append_list = { + let mut guard = registry.lock().unwrap(); + let mut info = guard.lookup(dev_id)?; + info.written += size; + info.chunk_count += 1; + + info.digest_list.push(digest_str); + info.offset_list.push(offset); + + if info.digest_list.len() >= 128 { + let param = json!({ "wid": wid, "digest-list": info.digest_list, "offset-list": info.offset_list }); + let param_data = param.to_string().as_bytes().to_vec(); + info.digest_list.truncate(0); + info.offset_list.truncate(0); + Some(param_data) + } else { + None + } + }; + + if let Some(data) = append_list { + client.upload_put("fixed_index", None, "application/json", data).await?; + } + + println!("upload chunk sucessful"); Ok(()) } @@ -206,8 +380,15 @@ fn backup_worker_task( let written_bytes = Arc::new(AtomicU64::new(0)); let written_bytes2 = written_bytes.clone(); + //let block_size = 4*1024*1024; + + let known_chunks = Arc::new(Mutex::new(HashSet::new())); + let crypt_config = repo.crypt_config; + runtime.spawn(async move { + let registry = Arc::new(Mutex::new(ImageRegistry::new())); + loop { let msg = command_rx.recv().unwrap(); // todo: should be blocking @@ -224,14 +405,28 @@ fn backup_worker_task( break; } BackupMessage::RegisterImage { device_name, size, result_channel } => { - let res = register_image(client.clone(), device_name, size).await; // fixme : errors + let res = register_image( + client.clone(), + crypt_config.clone(), + registry.clone(), + known_chunks.clone(), + device_name, + size + ).await; let _ = result_channel.lock().unwrap().send(res); - } - BackupMessage::WriteData { dev_id, data, size, callback_info } => { + } + BackupMessage::CloseImage { dev_id, callback_info } => { + handle_async_command( + close_image(client.clone(), registry.clone(), dev_id), + abort.listen(), + callback_info, + ).await; + } + BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => { written_bytes2.fetch_add(size, Ordering::SeqCst); handle_async_command( - write_data(client.clone(), dev_id, data, size), + write_data(client.clone(), registry.clone(), dev_id, data, offset, size), abort.listen(), callback_info, ).await; @@ -281,6 +476,8 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox let backup_time = Utc.timestamp(Utc::now().timestamp(), 0); + let crypt_config: Option> = None; + let repo = BackupRepository { host: "localhost".to_owned(), user: "root@pam".to_owned(), @@ -288,6 +485,7 @@ pub extern "C" fn proxmox_backup_connect(error: * mut * mut c_char) -> *mut Prox backup_id: "99".to_owned(), password: "12345".to_owned(), backup_time, + crypt_config, }; match BackupTask::new(repo) { @@ -316,7 +514,7 @@ pub extern "C" fn proxmox_backup_abort( #[no_mangle] pub extern "C" fn proxmox_backup_register_image( handle: *mut ProxmoxBackupHandle, - device_name: *const c_char, + device_name: *const c_char, // expect utf8 here size: u64, error: * mut * mut c_char, ) -> c_int { @@ -326,7 +524,7 @@ pub extern "C" fn proxmox_backup_register_image( raise_error_int!(error, "task already aborted"); } - let device_name = unsafe { CStr::from_ptr(device_name).to_owned() }; + let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() }; let (result_sender, result_receiver) = channel(); @@ -352,6 +550,7 @@ pub extern "C" fn proxmox_backup_write_data_async( handle: *mut ProxmoxBackupHandle, dev_id: u8, data: *const u8, + offset: u64, size: u64, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, @@ -369,6 +568,7 @@ pub extern "C" fn proxmox_backup_write_data_async( let msg = BackupMessage::WriteData { dev_id, data: DataPointer(data), + offset, size, callback_info: CallbackPointers { callback, callback_data, error }, }; @@ -378,6 +578,33 @@ pub extern "C" fn proxmox_backup_write_data_async( println!("write_data_async end"); } +#[no_mangle] +pub extern "C" fn proxmox_backup_close_image_async( + handle: *mut ProxmoxBackupHandle, + dev_id: u8, + callback: extern "C" fn(*mut c_void), + callback_data: *mut c_void, + error: * mut * mut c_char, +) { + let task = unsafe { &mut *(handle as * mut BackupTask) }; + + if let Some(_reason) = &task.aborted { + let errmsg = CString::new("task already aborted".to_string()).unwrap(); + unsafe { *error = errmsg.into_raw(); } + callback(callback_data); + return; + } + + let msg = BackupMessage::CloseImage { + dev_id, + callback_info: CallbackPointers { callback, callback_data, error }, + }; + + println!("close_image_async start"); + let _res = task.command_tx.send(msg); // fixme: log errors + println!("close_image_async end"); +} + #[no_mangle] pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {