diff --git a/build.rs b/build.rs index 6940d3f..8a1008f 100644 --- a/build.rs +++ b/build.rs @@ -12,11 +12,7 @@ fn main() { Some(ver) if !ver.is_empty() => ver, _ => "UNKNOWN", }; - let version_string = format!( - "{} ({})", - crate_ver, - git_ver, - ); + let version_string = format!("{} ({})", crate_ver, git_ver,); cbindgen::Builder::new() .with_language(cbindgen::Language::C) diff --git a/src/backup.rs b/src/backup.rs index e30dfe1..e526980 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -1,25 +1,25 @@ -use anyhow::{format_err, bail, Error}; -use std::collections::HashSet; -use std::sync::{Mutex, Arc}; +use anyhow::{bail, format_err, Error}; use once_cell::sync::OnceCell; +use std::collections::HashSet; use std::os::raw::c_int; +use std::sync::{Arc, Mutex}; -use futures::future::{Future, Either, FutureExt}; +use futures::future::{Either, Future, FutureExt}; use tokio::runtime::Runtime; -use proxmox_sys::fs::file_get_contents; use proxmox_async::runtime::get_runtime_with_builder; +use proxmox_sys::fs::file_get_contents; use pbs_api_types::CryptMode; +use pbs_client::{BackupWriter, HttpClient, HttpClientOptions}; +use pbs_config::key_config::{load_and_decrypt_key, rsa_encrypt_key_config, KeyConfig}; +use pbs_datastore::{BackupDir, BackupManifest}; use pbs_tools::crypt_config::CryptConfig; -use pbs_config::key_config::{KeyConfig, load_and_decrypt_key, rsa_encrypt_key_config}; -use pbs_datastore::{BackupDir, BackupManifest,}; -use pbs_client::{HttpClient, HttpClientOptions, BackupWriter}; use super::BackupSetup; use crate::capi_types::*; -use crate::registry::Registry; use crate::commands::*; +use crate::registry::Registry; pub(crate) struct BackupTask { setup: BackupSetup, @@ -32,13 +32,12 @@ pub(crate) struct BackupTask { last_manifest: OnceCell>, manifest: Arc>, registry: Arc>>, - known_chunks: Arc>>, + known_chunks: Arc>>, abort: tokio::sync::broadcast::Sender<()>, - aborted: OnceCell, // set on abort, conatins abort reason + aborted: OnceCell, // set on abort, conatins abort reason } impl BackupTask { - /// Create a new instance, using the specified Runtime /// /// We keep a reference to the runtime - else the runtime can be @@ -47,17 +46,14 @@ impl BackupTask { setup: BackupSetup, compress: bool, crypt_mode: CryptMode, - runtime: Arc + runtime: Arc, ) -> Result { - let (crypt_config, rsa_encrypted_key) = match setup.keyfile { None => (None, None), Some(ref path) => { - let (key, created, _) = load_and_decrypt_key(path, & || { - match setup.key_password { - Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), - None => bail!("no key_password specified"), - } + let (key, created, _) = load_and_decrypt_key(path, &|| match setup.key_password { + Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), + None => bail!("no key_password specified"), })?; let rsa_encrypted_key = match setup.master_keyfile { Some(ref master_keyfile) => { @@ -68,7 +64,7 @@ impl BackupTask { key_config.created = created; // keep original value Some(rsa_encrypt_key_config(rsa, &key_config)?) - }, + } None => None, }; (Some(Arc::new(CryptConfig::new(key)?)), rsa_encrypted_key) @@ -96,7 +92,7 @@ impl BackupTask { known_chunks, writer: OnceCell::new(), last_manifest: OnceCell::new(), - aborted: OnceCell::new() + aborted: OnceCell::new(), }) } @@ -132,29 +128,43 @@ impl BackupTask { } pub async fn connect(&self) -> Result { - self.check_aborted()?; - let command_future = async { + let command_future = async { let options = HttpClientOptions::new_non_interactive( self.setup.password.clone(), self.setup.fingerprint.clone(), ); - let http = HttpClient::new(&self.setup.host, self.setup.port, &self.setup.auth_id, options)?; + let http = HttpClient::new( + &self.setup.host, + self.setup.port, + &self.setup.auth_id, + options, + )?; let writer = BackupWriter::start( - http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id, - self.setup.backup_time, false, false).await?; + http, + self.crypt_config.clone(), + &self.setup.store, + "vm", + &self.setup.backup_id, + self.setup.backup_time, + false, + false, + ) + .await?; let last_manifest = writer.download_previous_manifest().await; let mut result = 0; if let Ok(last_manifest) = last_manifest { result = 1; - self.last_manifest.set(Arc::new(last_manifest)) + self.last_manifest + .set(Arc::new(last_manifest)) .map_err(|_| format_err!("already connected!"))?; } - self.writer.set(writer) + self.writer + .set(writer) .map_err(|_| format_err!("already connected!"))?; Ok(result) @@ -171,12 +181,7 @@ impl BackupTask { .ok_or_else(|| format_err!("not connected")) } - pub async fn add_config( - &self, - name: String, - data: Vec, - ) -> Result { - + pub async fn add_config(&self, name: String, data: Vec) -> Result { self.check_aborted()?; let command_future = add_config( @@ -199,12 +204,15 @@ impl BackupTask { offset: u64, size: u64, ) -> Result { - self.check_aborted()?; let command_future = write_data( self.need_writer()?, - if self.crypt_mode == CryptMode::Encrypt { self.crypt_config.clone() } else { None }, + if self.crypt_mode == CryptMode::Encrypt { + self.crypt_config.clone() + } else { + None + }, Arc::clone(&self.registry), Arc::clone(&self.known_chunks), dev_id, @@ -223,17 +231,17 @@ impl BackupTask { self.last_manifest.get().map(Arc::clone) } - pub fn check_incremental( - &self, - device_name: String, - size: u64, - ) -> bool { + pub fn check_incremental(&self, device_name: String, size: u64) -> bool { match self.last_manifest() { Some(ref manifest) => { check_last_incremental_csum(Arc::clone(manifest), &device_name, size) - && check_last_encryption_mode(Arc::clone(manifest), &device_name, self.crypt_mode) + && check_last_encryption_mode( + Arc::clone(manifest), + &device_name, + self.crypt_mode, + ) && check_last_encryption_key(self.crypt_config.clone()) - }, + } None => false, } } @@ -244,7 +252,6 @@ impl BackupTask { size: u64, incremental: bool, ) -> Result { - self.check_aborted()?; let command_future = register_image( @@ -265,7 +272,6 @@ impl BackupTask { } pub async fn close_image(&self, dev_id: u8) -> Result { - self.check_aborted()?; let command_future = close_image( @@ -281,7 +287,6 @@ impl BackupTask { } pub async fn finish(&self) -> Result { - self.check_aborted()?; let command_future = finish_backup( @@ -294,21 +299,16 @@ impl BackupTask { let mut abort_rx = self.abort.subscribe(); abortable_command(command_future, abort_rx.recv()).await } - } -fn abortable_command<'a, F: 'a + Send + Future>>( +fn abortable_command<'a, F: 'a + Send + Future>>( command_future: F, - abort_future: impl 'a + Send + Future>, + abort_future: impl 'a + Send + Future>, ) -> impl 'a + Future> { - - futures::future::select(command_future.boxed(), abort_future.boxed()) - .map(move |either| { - match either { - Either::Left((result, _)) => { - result - } - Either::Right(_) => bail!("command aborted"), - } - }) + futures::future::select(command_future.boxed(), abort_future.boxed()).map(move |either| { + match either { + Either::Left((result, _)) => result, + Either::Right(_) => bail!("command aborted"), + } + }) } diff --git a/src/capi_types.rs b/src/capi_types.rs index 1b9abc1..6279e09 100644 --- a/src/capi_types.rs +++ b/src/capi_types.rs @@ -1,6 +1,6 @@ use anyhow::Error; -use std::os::raw::{c_char, c_void, c_int}; use std::ffi::CString; +use std::os::raw::{c_char, c_int, c_void}; pub(crate) struct CallbackPointers { pub callback: extern "C" fn(*mut c_void), @@ -11,7 +11,6 @@ pub(crate) struct CallbackPointers { unsafe impl std::marker::Send for CallbackPointers {} impl CallbackPointers { - pub fn send_result(self, result: Result) { match result { Ok(ret) => { @@ -38,7 +37,7 @@ impl CallbackPointers { } } -pub(crate) struct DataPointer (pub *const u8); +pub(crate) struct DataPointer(pub *const u8); unsafe impl std::marker::Send for DataPointer {} /// Opaque handle for restore jobs diff --git a/src/commands.rs b/src/commands.rs index 83bc9ae..27d958c 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,25 +1,27 @@ use anyhow::{bail, format_err, Error}; -use std::collections::{HashSet, HashMap}; -use std::sync::{Mutex, Arc}; +use std::collections::{HashMap, HashSet}; use std::os::raw::c_int; +use std::sync::{Arc, Mutex}; use futures::future::{Future, TryFutureExt}; use serde_json::json; use pbs_api_types::CryptMode; -use pbs_tools::crypt_config::CryptConfig; -use pbs_datastore::index::IndexFile; -use pbs_datastore::data_blob::DataChunkBuilder; -use pbs_datastore::manifest::{BackupManifest, MANIFEST_BLOB_NAME, ENCRYPTED_KEY_BLOB_NAME}; use pbs_client::{BackupWriter, H2Client, UploadOptions}; +use pbs_datastore::data_blob::DataChunkBuilder; +use pbs_datastore::index::IndexFile; +use pbs_datastore::manifest::{BackupManifest, ENCRYPTED_KEY_BLOB_NAME, MANIFEST_BLOB_NAME}; +use pbs_tools::crypt_config::CryptConfig; -use crate::registry::Registry; use crate::capi_types::DataPointer; -use crate::upload_queue::{ChunkUploadInfo, UploadQueueSender, UploadResultReceiver, create_upload_queue}; +use crate::registry::Registry; +use crate::upload_queue::{ + create_upload_queue, ChunkUploadInfo, UploadQueueSender, UploadResultReceiver, +}; use lazy_static::lazy_static; -lazy_static!{ +lazy_static! { // Note: Any state stored here that needs to be sent along with migration // needs to be specified in (de)serialize_state as well! @@ -41,7 +43,6 @@ pub struct ImageUploadInfo { upload_result: Option, } - pub(crate) fn serialize_state() -> Vec { let prev_csums = &*PREVIOUS_CSUMS.lock().unwrap(); let prev_key_fingerprint = &*PREVIOUS_KEY_FINGERPRINT.lock().unwrap(); @@ -57,15 +58,13 @@ pub(crate) fn deserialize_state(data: &[u8]) -> Result<(), Error> { Ok(()) } - // Note: We alway register/upload a chunk containing zeros async fn register_zero_chunk( client: Arc, crypt_config: Option>, chunk_size: usize, wid: u64, -) -> Result<[u8;32], Error> { - +) -> Result<[u8; 32], Error> { let (chunk, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk( crypt_config.as_ref().map(Arc::as_ref), chunk_size, @@ -80,7 +79,14 @@ async fn register_zero_chunk( "encoded-size": chunk_data.len(), }); - client.upload_post("fixed_chunk", Some(param), "application/octet-stream", chunk_data).await?; + client + .upload_post( + "fixed_chunk", + Some(param), + "application/octet-stream", + chunk_data, + ) + .await?; Ok(zero_chunk_digest) } @@ -103,7 +109,9 @@ pub(crate) async fn add_config( ..UploadOptions::default() }; - let stats = client.upload_blob_from_data(data, &blob_name, options).await?; + let stats = client + .upload_blob_from_data(data, &blob_name, options) + .await?; let mut guard = manifest.lock().unwrap(); guard.add_file(blob_name, stats.size, stats.csum, crypt_mode)?; @@ -115,14 +123,13 @@ fn archive_name(device_name: &str) -> String { format!("{}.img.fidx", device_name) } -const CRYPT_CONFIG_HASH_INPUT:&[u8] = b"this is just a static string to protect against key changes"; +const CRYPT_CONFIG_HASH_INPUT: &[u8] = + b"this is just a static string to protect against key changes"; /// Create an identifying digest for the crypt config /// legacy version for VMs freshly migrated from old version /// TODO: remove in PVE 7.0 -pub(crate) fn crypt_config_digest( - config: Arc, -) -> [u8;32] { +pub(crate) fn crypt_config_digest(config: Arc) -> [u8; 32] { config.compute_digest(CRYPT_CONFIG_HASH_INPUT) } @@ -131,9 +138,10 @@ pub(crate) fn check_last_incremental_csum( device_name: &str, device_size: u64, ) -> bool { - match PREVIOUS_CSUMS.lock().unwrap().get(device_name) { - Some(csum) => manifest.verify_file(&archive_name(device_name), csum, device_size).is_ok(), + Some(csum) => manifest + .verify_file(&archive_name(device_name), csum, device_size) + .is_ok(), None => false, } } @@ -144,29 +152,25 @@ pub(crate) fn check_last_encryption_mode( crypt_mode: CryptMode, ) -> bool { match manifest.lookup_file_info(&archive_name(device_name)) { - Ok(file) => { - match (file.crypt_mode, crypt_mode) { - (CryptMode::Encrypt, CryptMode::Encrypt) => true, - (CryptMode::Encrypt, _) => false, - (CryptMode::SignOnly, CryptMode::Encrypt) => false, - (CryptMode::SignOnly, _) => true, - (CryptMode::None, CryptMode::Encrypt) => false, - (CryptMode::None, _) => true, - } + Ok(file) => match (file.crypt_mode, crypt_mode) { + (CryptMode::Encrypt, CryptMode::Encrypt) => true, + (CryptMode::Encrypt, _) => false, + (CryptMode::SignOnly, CryptMode::Encrypt) => false, + (CryptMode::SignOnly, _) => true, + (CryptMode::None, CryptMode::Encrypt) => false, + (CryptMode::None, _) => true, }, _ => false, } } -pub(crate) fn check_last_encryption_key( - config: Option>, -) -> bool { +pub(crate) fn check_last_encryption_key(config: Option>) -> bool { let fingerprint_guard = PREVIOUS_KEY_FINGERPRINT.lock().unwrap(); - match (*fingerprint_guard, config) { + match (*fingerprint_guard, config) { (Some(last_fingerprint), Some(current_config)) => { current_config.fingerprint() == last_fingerprint || crypt_config_digest(current_config) == last_fingerprint - }, + } (None, None) => true, _ => false, } @@ -179,24 +183,26 @@ pub(crate) async fn register_image( crypt_mode: CryptMode, manifest: Option>, registry: Arc>>, - known_chunks: Arc>>, + known_chunks: Arc>>, device_name: String, device_size: u64, chunk_size: u64, incremental: bool, ) -> Result { - let archive_name = archive_name(&device_name); let index = match manifest { Some(manifest) => { - match client.download_previous_fixed_index(&archive_name, &manifest, Arc::clone(&known_chunks)).await { + match client + .download_previous_fixed_index(&archive_name, &manifest, Arc::clone(&known_chunks)) + .await + { Ok(index) => Some(index), // not having a previous index is not fatal, so ignore errors - Err(_) => None + Err(_) => None, } - }, - None => None + } + None => None, }; let mut param = json!({ "archive-name": archive_name , "size": device_size }); @@ -210,7 +216,7 @@ pub(crate) async fn register_image( match index { Some(index) => { - let index_size = ((device_size + chunk_size -1)/chunk_size) as usize; + let index_size = ((device_size + chunk_size - 1) / chunk_size) as usize; if index_size != index.index_count() { bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name); } @@ -219,23 +225,31 @@ pub(crate) async fn register_image( } initial_index = Arc::new(Some(index)); - }, - None => bail!("no previous backup found, cannot do incremental backup") + } + None => bail!("no previous backup found, cannot do incremental backup"), } - } else { bail!("no previous backups in this session, cannot do incremental backup"); } } - let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap(); + let wid = client + .post("fixed_index", Some(param)) + .await? + .as_u64() + .unwrap(); let zero_chunk_digest = register_zero_chunk( Arc::clone(&client), - if crypt_mode == CryptMode::Encrypt { crypt_config } else { None }, + if crypt_mode == CryptMode::Encrypt { + crypt_config + } else { + None + }, chunk_size as usize, wid, - ).await?; + ) + .await?; let (upload_queue, upload_result) = create_upload_queue( Arc::clone(&client), @@ -253,7 +267,7 @@ pub(crate) async fn register_image( device_size, upload_queue: Some(upload_queue), upload_result: Some(upload_result), - }; + }; let mut guard = registry.lock().unwrap(); let dev_id = guard.register(info)?; @@ -268,7 +282,6 @@ pub(crate) async fn close_image( dev_id: u8, crypt_mode: CryptMode, ) -> Result { - //println!("close image {}", dev_id); let (wid, upload_result, device_name, device_size) = { @@ -277,17 +290,22 @@ pub(crate) async fn close_image( info.upload_queue.take(); // close - (info.wid, info.upload_result.take(), info.device_name.clone(), info.device_size) + ( + info.wid, + info.upload_result.take(), + info.device_name.clone(), + info.device_size, + ) }; let upload_result = match upload_result { - Some(upload_result) => { - match upload_result.await? { - Ok(res) => res, - Err(err) => bail!("close_image: upload error: {}", err), - } + Some(upload_result) => match upload_result.await? { + Ok(res) => res, + Err(err) => bail!("close_image: upload error: {}", err), + }, + None => { + bail!("close_image: unknown error because upload result channel was already closed") } - None => bail!("close_image: unknown error because upload result channel was already closed"), }; let csum = hex::encode(&upload_result.csum); @@ -306,9 +324,13 @@ pub(crate) async fn close_image( let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap(); prev_csum_guard.insert(info.device_name.clone(), upload_result.csum); - let mut guard = manifest.lock().unwrap(); - guard.add_file(format!("{}.img.fidx", device_name), device_size, upload_result.csum, crypt_mode)?; + guard.add_file( + format!("{}.img.fidx", device_name), + device_size, + upload_result.csum, + crypt_mode, + )?; Ok(0) } @@ -318,22 +340,20 @@ pub(crate) async fn write_data( client: Arc, crypt_config: Option>, registry: Arc>>, - known_chunks: Arc>>, + known_chunks: Arc>>, dev_id: u8, data: DataPointer, offset: u64, - size: u64, // actual data size + size: u64, // actual data size chunk_size: u64, // expected data size compress: bool, ) -> Result { - //println!("dev {}: write {} {}", dev_id, offset, size); let (wid, mut upload_queue, zero_chunk_digest) = { let mut guard = registry.lock().unwrap(); let info = guard.lookup(dev_id)?; - (info.wid, info.upload_queue.clone(), info.zero_chunk_digest) }; @@ -344,7 +364,12 @@ pub(crate) async fn write_data( if size != chunk_size { bail!("write_data: got invalid null chunk"); } - let upload_info = ChunkUploadInfo { digest: zero_chunk_digest, offset, size, chunk_is_known: true }; + let upload_info = ChunkUploadInfo { + digest: zero_chunk_digest, + offset, + size, + chunk_is_known: true, + }; reused = true; Box::new(futures::future::ok(upload_info)) } else { @@ -364,10 +389,15 @@ pub(crate) async fn write_data( }; if chunk_is_known { - let upload_info = ChunkUploadInfo { digest: *digest, offset, size, chunk_is_known: true }; + let upload_info = ChunkUploadInfo { + digest: *digest, + offset, + size, + chunk_is_known: true, + }; reused = true; Box::new(futures::future::ok(upload_info)) - } else { + } else { let (chunk, digest) = chunk_builder.build()?; let digest_str = hex::encode(&digest); let chunk_data = chunk.into_inner(); @@ -380,20 +410,25 @@ pub(crate) async fn write_data( }); // Phase 1: send data - let response_future = client.send_upload_request( - "POST", - "fixed_chunk", - Some(param), - "application/octet-stream", - chunk_data, - ).await?; + let response_future = client + .send_upload_request( + "POST", + "fixed_chunk", + Some(param), + "application/octet-stream", + chunk_data, + ) + .await?; // create response future (run that in other task) let upload_future = response_future .map_err(Error::from) .and_then(H2Client::h2api_response) - .map_ok(move |_| { - ChunkUploadInfo { digest, offset, size, chunk_is_known: false } + .map_ok(move |_| ChunkUploadInfo { + digest, + offset, + size, + chunk_is_known: false, }) .map_err(|err| format_err!("pipelined request failed: {}", err)); @@ -441,28 +476,35 @@ pub(crate) async fn finish_backup( ) -> Result { if let Some(rsa_encrypted_key) = rsa_encrypted_key { let target = ENCRYPTED_KEY_BLOB_NAME; - let options = UploadOptions { compress: false, encrypt: false, ..UploadOptions::default() }; + let options = UploadOptions { + compress: false, + encrypt: false, + ..UploadOptions::default() + }; let stats = client .upload_blob_from_data(rsa_encrypted_key, target, options) .await?; - manifest.lock().unwrap().add_file(target.to_string(), stats.size, stats.csum, CryptMode::Encrypt)?; + manifest.lock().unwrap().add_file( + target.to_string(), + stats.size, + stats.csum, + CryptMode::Encrypt, + )?; }; - let manifest = { let guard = manifest.lock().unwrap(); - guard.to_string(crypt_config.as_ref().map(Arc::as_ref)) + guard + .to_string(crypt_config.as_ref().map(Arc::as_ref)) .map_err(|err| format_err!("unable to format manifest - {}", err))? }; { let key_fingerprint = match crypt_config { Some(current_config) => { - let fp = current_config - .fingerprint() - .to_owned(); + let fp = current_config.fingerprint().to_owned(); Some(fp) - }, + } None => None, }; diff --git a/src/lib.rs b/src/lib.rs index bfc379d..278313e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,22 +2,22 @@ use anyhow::{format_err, Error}; use std::ffi::CString; +use std::os::raw::{c_char, c_int, c_long, c_uchar, c_void}; use std::ptr; -use std::os::raw::{c_uchar, c_char, c_int, c_void, c_long}; -use std::sync::{Arc, Mutex, Condvar}; +use std::sync::{Arc, Condvar, Mutex}; use proxmox_lang::try_block; use pbs_api_types::{Authid, CryptMode}; -use pbs_datastore::BackupDir; use pbs_client::BackupRepository; +use pbs_datastore::BackupDir; mod capi_types; use capi_types::*; +mod commands; mod registry; mod upload_queue; -mod commands; mod backup; use backup::*; @@ -25,16 +25,14 @@ use backup::*; mod restore; use restore::*; -mod tools; mod shared_cache; +mod tools; -pub const PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE: u64 = 1024*1024*4; +pub const PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024 * 4; use lazy_static::lazy_static; -lazy_static!{ - static ref VERSION_CSTR: CString = { - CString::new(env!("PBS_LIB_VERSION")).unwrap() - }; +lazy_static! { + static ref VERSION_CSTR: CString = { CString::new(env!("PBS_LIB_VERSION")).unwrap() }; } /// Return a read-only pointer to a string containing the version of the library. @@ -50,9 +48,11 @@ pub extern "C" fn proxmox_backup_qemu_version() -> *const c_char { /// and free the allocated memory. #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub extern "C" fn proxmox_backup_free_error(ptr: * mut c_char) { +pub extern "C" fn proxmox_backup_free_error(ptr: *mut c_char) { if !ptr.is_null() { - unsafe { CString::from_raw(ptr); } + unsafe { + CString::from_raw(ptr); + } } } @@ -63,24 +63,28 @@ fn convert_error_to_cstring(err: String) -> CString { Err(err) => { eprintln!("got error containung 0 bytes: {}", err); CString::new("failed to convert error message containing 0 bytes").unwrap() - }, + } } } macro_rules! raise_error_null { ($error:ident, $err:expr) => {{ let errmsg = convert_error_to_cstring($err.to_string()); - unsafe { *$error = errmsg.into_raw(); } + unsafe { + *$error = errmsg.into_raw(); + } return ptr::null_mut(); - }} + }}; } macro_rules! raise_error_int { ($error:ident, $err:expr) => {{ let errmsg = convert_error_to_cstring($err.to_string()); - unsafe { *$error = errmsg.into_raw(); } + unsafe { + *$error = errmsg.into_raw(); + } return -1; - }} + }}; } macro_rules! param_not_null { @@ -90,7 +94,7 @@ macro_rules! param_not_null { $callback_info.send_result(result); return; } - }} + }}; } /// Returns the text presentation (relative path) for a backup snapshot @@ -103,9 +107,8 @@ pub extern "C" fn proxmox_backup_snapshot_string( backup_type: *const c_char, backup_id: *const c_char, backup_time: i64, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> *const c_char { - let snapshot: Result = try_block!({ let backup_type: String = tools::utf8_c_string_lossy(backup_type) .ok_or_else(|| format_err!("backup_type must not be NULL"))?; @@ -118,14 +121,11 @@ pub extern "C" fn proxmox_backup_snapshot_string( }); match snapshot { - Ok(snapshot) => { - unsafe { libc::strdup(snapshot.as_ptr()) } - } + Ok(snapshot) => unsafe { libc::strdup(snapshot.as_ptr()) }, Err(err) => raise_error_null!(error, err), } } - #[derive(Clone)] pub(crate) struct BackupSetup { pub host: String, @@ -150,7 +150,6 @@ struct GotResultCondition { } impl GotResultCondition { - #[allow(clippy::mutex_atomic)] pub fn new() -> Self { Self { @@ -186,10 +185,8 @@ impl GotResultCondition { #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] - extern "C" fn wakeup_callback( - callback_data: *mut c_void, - ) { - let callback_data = unsafe { &mut *( callback_data as * mut GotResultCondition) }; + extern "C" fn wakeup_callback(callback_data: *mut c_void) { + let callback_data = unsafe { &mut *(callback_data as *mut GotResultCondition) }; #[allow(clippy::mutex_atomic)] let mut done = callback_data.lock.lock().unwrap(); *done = true; @@ -197,7 +194,6 @@ impl GotResultCondition { } } - /// Create a new instance /// /// Uses `PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE` if `chunk_size` is zero. @@ -215,9 +211,8 @@ pub extern "C" fn proxmox_backup_new( compress: bool, encrypt: bool, fingerprint: *const c_char, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> *mut ProxmoxBackupHandle { - let task: Result<_, Error> = try_block!({ let repo: BackupRepository = tools::utf8_c_string(repo)? .ok_or_else(|| format_err!("repo must not be NULL"))? @@ -238,7 +233,11 @@ pub extern "C" fn proxmox_backup_new( } let crypt_mode = if keyfile.is_some() { - if encrypt { CryptMode::Encrypt } else { CryptMode::SignOnly } + if encrypt { + CryptMode::Encrypt + } else { + CryptMode::SignOnly + } } else { CryptMode::None }; @@ -248,7 +247,11 @@ pub extern "C" fn proxmox_backup_new( port: repo.port(), auth_id: repo.auth_id().to_owned(), store: repo.store().to_owned(), - chunk_size: if chunk_size > 0 { chunk_size } else { PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE }, + chunk_size: if chunk_size > 0 { + chunk_size + } else { + PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE + }, backup_type: String::from("vm"), backup_id, password, @@ -265,14 +268,14 @@ pub extern "C" fn proxmox_backup_new( match task { Ok(task) => { let boxed_task = Box::new(Arc::new(task)); - Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle + Box::into_raw(boxed_task) as *mut ProxmoxBackupHandle } Err(err) => raise_error_null!(error, err), } } fn backup_handle_to_task(handle: *mut ProxmoxBackupHandle) -> Arc { - let task = unsafe { & *(handle as *const Arc) }; + let task = unsafe { &*(handle as *const Arc) }; // increase reference count while we use it inside rust Arc::clone(task) } @@ -324,7 +327,12 @@ pub extern "C" fn proxmox_backup_connect_async( error: *mut *mut c_char, ) { let task = backup_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; task.runtime().spawn(async move { let result = task.connect().await; @@ -339,13 +347,10 @@ pub extern "C" fn proxmox_backup_connect_async( /// allocated memory. #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub extern "C" fn proxmox_backup_abort( - handle: *mut ProxmoxBackupHandle, - reason: *const c_char, -) { +pub extern "C" fn proxmox_backup_abort(handle: *mut ProxmoxBackupHandle, reason: *const c_char) { let task = backup_handle_to_task(handle); - let reason = tools::utf8_c_string_lossy(reason) - .unwrap_or_else(|| "no reason (NULL)".to_string()); + let reason = + tools::utf8_c_string_lossy(reason).unwrap_or_else(|| "no reason (NULL)".to_string()); task.abort(reason); } @@ -362,11 +367,19 @@ pub extern "C" fn proxmox_backup_check_incremental( ) -> c_int { let task = backup_handle_to_task(handle); - if device_name.is_null() { return 0; } + if device_name.is_null() { + return 0; + } match tools::utf8_c_string_lossy(device_name) { None => 0, - Some(device_name) => if task.check_incremental(device_name, size) { 1 } else { 0 }, + Some(device_name) => { + if task.check_incremental(device_name, size) { + 1 + } else { + 0 + } + } } } @@ -378,7 +391,7 @@ pub extern "C" fn proxmox_backup_register_image( device_name: *const c_char, // expect utf8 here size: u64, incremental: bool, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -387,7 +400,10 @@ pub extern "C" fn proxmox_backup_register_image( let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_backup_register_image_async( - handle, device_name, size, incremental, + handle, + device_name, + size, + incremental, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -414,10 +430,15 @@ pub extern "C" fn proxmox_backup_register_image_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let task = backup_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; param_not_null!(device_name, callback_info); @@ -437,7 +458,7 @@ pub extern "C" fn proxmox_backup_add_config( name: *const c_char, // expect utf8 here data: *const u8, size: u64, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -446,7 +467,10 @@ pub extern "C" fn proxmox_backup_add_config( let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_backup_add_config_async( - handle, name, data, size, + handle, + name, + data, + size, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -471,11 +495,16 @@ pub extern "C" fn proxmox_backup_add_config_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let task = backup_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; param_not_null!(name, callback_info); let name = unsafe { tools::utf8_c_string_lossy_non_null(name) }; @@ -508,7 +537,7 @@ pub extern "C" fn proxmox_backup_write_data( data: *const u8, offset: u64, size: u64, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -517,7 +546,11 @@ pub extern "C" fn proxmox_backup_write_data( let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_backup_write_data_async( - handle, dev_id, data, offset, size, + handle, + dev_id, + data, + offset, + size, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -554,10 +587,15 @@ pub extern "C" fn proxmox_backup_write_data_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let task = backup_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; let data = DataPointer(data); @@ -573,7 +611,7 @@ pub extern "C" fn proxmox_backup_write_data_async( pub extern "C" fn proxmox_backup_close_image( handle: *mut ProxmoxBackupHandle, dev_id: u8, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -582,7 +620,8 @@ pub extern "C" fn proxmox_backup_close_image( let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_backup_close_image_async( - handle, dev_id, + handle, + dev_id, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -605,10 +644,15 @@ pub extern "C" fn proxmox_backup_close_image_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let task = backup_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; task.runtime().spawn(async move { let result = task.close_image(dev_id).await; @@ -621,7 +665,7 @@ pub extern "C" fn proxmox_backup_close_image_async( #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_backup_finish( handle: *mut ProxmoxBackupHandle, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -653,10 +697,15 @@ pub extern "C" fn proxmox_backup_finish_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { - let task = unsafe { & *(handle as * const Arc) }; - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let task = unsafe { &*(handle as *const Arc) }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; task.runtime().spawn(async move { let result = task.finish().await; @@ -670,8 +719,7 @@ pub extern "C" fn proxmox_backup_finish_async( #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) { - - let task = handle as * mut Arc; + let task = handle as *mut Arc; unsafe { Box::from_raw(task) }; // take ownership, drop(task) } @@ -681,7 +729,7 @@ pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) { // currently only implemented for images... fn restore_handle_to_task(handle: *mut ProxmoxRestoreHandle) -> Arc { - let restore_task = unsafe { & *(handle as *const Arc) }; + let restore_task = unsafe { &*(handle as *const Arc) }; // increase reference count while we use it inside rust Arc::clone(restore_task) } @@ -696,9 +744,8 @@ pub extern "C" fn proxmox_restore_new( keyfile: *const c_char, key_password: *const c_char, fingerprint: *const c_char, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> *mut ProxmoxRestoreHandle { - let result: Result<_, Error> = try_block!({ let repo: BackupRepository = tools::utf8_c_string(repo)? .ok_or_else(|| format_err!("repo must not be NULL"))? @@ -740,7 +787,7 @@ pub extern "C" fn proxmox_restore_new( match result { Ok(restore_task) => { let boxed_restore_task = Box::new(Arc::new(restore_task)); - Box::into_raw(boxed_restore_task) as * mut ProxmoxRestoreHandle + Box::into_raw(boxed_restore_task) as *mut ProxmoxRestoreHandle } Err(err) => raise_error_null!(error, err), } @@ -790,7 +837,12 @@ pub extern "C" fn proxmox_restore_connect_async( error: *mut *mut c_char, ) { let restore_task = restore_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; restore_task.runtime().spawn(async move { let result = restore_task.connect().await; @@ -804,8 +856,7 @@ pub extern "C" fn proxmox_restore_connect_async( #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_restore_disconnect(handle: *mut ProxmoxRestoreHandle) { - - let restore_task = handle as * mut Arc; + let restore_task = handle as *mut Arc; let restore_task = unsafe { Box::from_raw(restore_task) }; drop(restore_task); @@ -823,14 +874,12 @@ pub extern "C" fn proxmox_restore_image( archive_name: *const c_char, // expect full name here, i.e. "name.img.fidx" callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int, callback_data: *mut c_void, - error: * mut * mut c_char, + error: *mut *mut c_char, verbose: bool, ) -> c_int { - let restore_task = restore_handle_to_task(handle); let result: Result<_, Error> = try_block!({ - let archive_name = tools::utf8_c_string(archive_name)? .ok_or_else(|| format_err!("archive_name must not be NULL"))?; @@ -838,13 +887,15 @@ pub extern "C" fn proxmox_restore_image( callback(callback_data, offset, data.as_ptr(), data.len() as u64) }; - let write_zero_callback = move |offset: u64, len: u64| { - callback(callback_data, offset, std::ptr::null(), len) - }; + let write_zero_callback = + move |offset: u64, len: u64| callback(callback_data, offset, std::ptr::null(), len); - proxmox_async::runtime::block_on( - restore_task.restore_image(archive_name, write_data_callback, write_zero_callback, verbose) - )?; + proxmox_async::runtime::block_on(restore_task.restore_image( + archive_name, + write_data_callback, + write_zero_callback, + verbose, + ))?; Ok(()) }); @@ -862,14 +913,15 @@ pub extern "C" fn proxmox_restore_image( pub extern "C" fn proxmox_restore_open_image( handle: *mut ProxmoxRestoreHandle, archive_name: *const c_char, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; let mut got_result_condition = GotResultCondition::new(); let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_restore_open_image_async( - handle, archive_name, + handle, + archive_name, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -890,10 +942,15 @@ pub extern "C" fn proxmox_restore_open_image_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let restore_task = restore_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; param_not_null!(archive_name, callback_info); let archive_name = unsafe { tools::utf8_c_string_lossy_non_null(archive_name) }; @@ -901,7 +958,7 @@ pub extern "C" fn proxmox_restore_open_image_async( restore_task.runtime().spawn(async move { let result = match restore_task.open_image(archive_name).await { Ok(res) => Ok(res as i32), - Err(err) => Err(err) + Err(err) => Err(err), }; callback_info.send_result(result); }); @@ -913,7 +970,7 @@ pub extern "C" fn proxmox_restore_open_image_async( pub extern "C" fn proxmox_restore_get_image_length( handle: *mut ProxmoxRestoreHandle, aid: u8, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_long { let restore_task = restore_handle_to_task(handle); let result = restore_task.get_image_length(aid); @@ -940,7 +997,7 @@ pub extern "C" fn proxmox_restore_read_image_at( data: *mut u8, offset: u64, size: u64, - error: * mut * mut c_char, + error: *mut *mut c_char, ) -> c_int { let mut result: c_int = -1; @@ -949,7 +1006,11 @@ pub extern "C" fn proxmox_restore_read_image_at( let callback_info = got_result_condition.callback_info(&mut result, error); proxmox_restore_read_image_at_async( - handle, aid, data, offset, size, + handle, + aid, + data, + offset, + size, callback_info.callback, callback_info.callback_data, callback_info.result, @@ -983,11 +1044,16 @@ pub extern "C" fn proxmox_restore_read_image_at_async( callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, - error: * mut * mut c_char, + error: *mut *mut c_char, ) { let restore_task = restore_handle_to_task(handle); - let callback_info = CallbackPointers { callback, callback_data, error, result }; + let callback_info = CallbackPointers { + callback, + callback_data, + error, + result, + }; param_not_null!(data, callback_info); let data = DataPointer(data); @@ -1007,7 +1073,9 @@ pub extern "C" fn proxmox_restore_read_image_at_async( #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_export_state(buf_size: *mut usize) -> *mut u8 { let data = commands::serialize_state().into_boxed_slice(); - unsafe { *buf_size = data.len(); } + unsafe { + *buf_size = data.len(); + } Box::leak(data).as_mut_ptr() } @@ -1028,6 +1096,8 @@ pub extern "C" fn proxmox_import_state(buf: *const u8, buf_size: usize) { #[allow(clippy::not_unsafe_ptr_arg_deref)] pub extern "C" fn proxmox_free_state_buf(buf: *mut u8) { if !buf.is_null() { - unsafe { Box::from_raw(buf); } + unsafe { + Box::from_raw(buf); + } } } diff --git a/src/registry.rs b/src/registry.rs index a84f9db..67fb601 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -9,10 +9,11 @@ pub struct Registry { } impl Registry { - /// Create a new instance pub fn new() -> Self { - Self { info_list: Vec::new() } + Self { + info_list: Vec::new(), + } } /// Register data, returns associated ID diff --git a/src/restore.rs b/src/restore.rs index f70f2ec..f95d6d6 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -1,25 +1,25 @@ -use std::sync::{Arc, Mutex}; use std::convert::TryInto; +use std::sync::{Arc, Mutex}; -use anyhow::{format_err, bail, Error}; +use anyhow::{bail, format_err, Error}; use once_cell::sync::OnceCell; use tokio::runtime::Runtime; use proxmox_async::runtime::get_runtime_with_builder; -use pbs_tools::crypt_config::CryptConfig; +use pbs_client::{BackupReader, HttpClient, HttpClientOptions, RemoteChunkReader}; use pbs_config::key_config::load_and_decrypt_key; -use pbs_datastore::BackupManifest; -use pbs_datastore::index::IndexFile; use pbs_datastore::cached_chunk_reader::CachedChunkReader; -use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::data_blob::DataChunkBuilder; +use pbs_datastore::fixed_index::FixedIndexReader; +use pbs_datastore::index::IndexFile; use pbs_datastore::read_chunk::ReadChunk; -use pbs_client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader}; +use pbs_datastore::BackupManifest; +use pbs_tools::crypt_config::CryptConfig; use super::BackupSetup; -use crate::registry::Registry; use crate::capi_types::DataPointer; +use crate::registry::Registry; use crate::shared_cache::get_shared_chunk_cache; struct ImageAccessInfo { @@ -38,21 +38,17 @@ pub(crate) struct RestoreTask { } impl RestoreTask { - /// Create a new instance, using the specified Runtime /// /// We keep a reference to the runtime - else the runtime can be /// dropped and further connections fails. pub fn with_runtime(setup: BackupSetup, runtime: Arc) -> Result { - let crypt_config = match setup.keyfile { None => None, Some(ref path) => { - let (key, _, _) = load_and_decrypt_key(path, & || { - match setup.key_password { - Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), - None => bail!("no key_password specified"), - } + let (key, _, _) = load_and_decrypt_key(path, &|| match setup.key_password { + Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), + None => bail!("no key_password specified"), })?; Some(Arc::new(CryptConfig::new(key)?)) } @@ -81,13 +77,17 @@ impl RestoreTask { } pub async fn connect(&self) -> Result { - let options = HttpClientOptions::new_non_interactive( self.setup.password.clone(), self.setup.fingerprint.clone(), ); - let http = HttpClient::new(&self.setup.host, self.setup.port, &self.setup.auth_id, options)?; + let http = HttpClient::new( + &self.setup.host, + self.setup.port, + &self.setup.auth_id, + options, + )?; let client = BackupReader::start( http, self.crypt_config.clone(), @@ -95,16 +95,19 @@ impl RestoreTask { &self.setup.backup_type, &self.setup.backup_id, self.setup.backup_time, - true - ).await?; + true, + ) + .await?; let (manifest, _) = client.download_manifest().await?; manifest.check_fingerprint(self.crypt_config.as_ref().map(Arc::as_ref))?; - self.manifest.set(Arc::new(manifest)) + self.manifest + .set(Arc::new(manifest)) .map_err(|_| format_err!("already connected!"))?; - self.client.set(client) + self.client + .set(client) .map_err(|_| format_err!("already connected!"))?; Ok(0) @@ -121,7 +124,6 @@ impl RestoreTask { write_zero_callback: impl Fn(u64, u64) -> i32, verbose: bool, ) -> Result<(), Error> { - if verbose { eprintln!("download and verify backup index"); } @@ -136,7 +138,9 @@ impl RestoreTask { None => bail!("no manifest"), }; - let index = client.download_fixed_index(&manifest, &archive_name).await?; + let index = client + .download_fixed_index(&manifest, &archive_name) + .await?; let (_, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk( self.crypt_config.as_ref().map(Arc::as_ref), @@ -163,7 +167,7 @@ impl RestoreTask { for pos in 0..index.index_count() { let digest = index.index_digest(pos).unwrap(); - let offset = (pos*index.chunk_size) as u64; + let offset = (pos * index.chunk_size) as u64; if digest == &zero_chunk_digest { let res = write_zero_callback(offset, index.chunk_size as u64); if res < 0 { @@ -180,12 +184,16 @@ impl RestoreTask { bytes += raw_data.len(); } if verbose { - let next_per = ((pos+1)*100)/index.index_count(); + let next_per = ((pos + 1) * 100) / index.index_count(); if per != next_per { - eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); + eprintln!( + "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", + next_per, + bytes, + zeroes * 100 / bytes, + zeroes, + start_time.elapsed().as_secs() + ); per = next_per; } } @@ -193,10 +201,11 @@ impl RestoreTask { let end_time = std::time::Instant::now(); let elapsed = end_time.duration_since(start_time); - eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", - bytes, - elapsed.as_secs_f64(), - bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) + eprintln!( + "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", + bytes, + elapsed.as_secs_f64(), + bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64()) ); Ok(()) @@ -208,11 +217,7 @@ impl RestoreTask { Ok(info.archive_size) } - pub async fn open_image( - &self, - archive_name: String, - ) -> Result { - + pub async fn open_image(&self, archive_name: String) -> Result { let client = match self.client.get() { Some(reader) => Arc::clone(reader), None => bail!("not connected"), @@ -223,7 +228,9 @@ impl RestoreTask { None => bail!("no manifest"), }; - let index = client.download_fixed_index(&manifest, &archive_name).await?; + let index = client + .download_fixed_index(&manifest, &archive_name) + .await?; let archive_size = index.index_bytes(); let most_used = index.find_most_used_chunks(8); @@ -237,11 +244,16 @@ impl RestoreTask { ); let cache = get_shared_chunk_cache(); - let reader = Arc::new(CachedChunkReader::new_with_cache(chunk_reader, index, cache)); + let reader = Arc::new(CachedChunkReader::new_with_cache( + chunk_reader, + index, + cache, + )); let info = ImageAccessInfo { archive_size, - _archive_name: archive_name, /// useful to debug + _archive_name: archive_name, + /// useful to debug reader, }; @@ -255,7 +267,6 @@ impl RestoreTask { offset: u64, size: u64, ) -> Result { - let (reader, image_size) = { let mut guard = self.image_registry.lock().unwrap(); let info = guard.lookup(aid)?; diff --git a/src/tools.rs b/src/tools.rs index da5bdf9..a9fa298 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -7,11 +7,7 @@ pub fn utf8_c_string(ptr: *const c_char) -> Result, Error> { Ok(if ptr.is_null() { None } else { - Some(unsafe { - CStr::from_ptr(ptr) - .to_str()? - .to_owned() - }) + Some(unsafe { CStr::from_ptr(ptr).to_str()?.to_owned() }) }) } diff --git a/src/upload_queue.rs b/src/upload_queue.rs index fc18af1..941734b 100644 --- a/src/upload_queue.rs +++ b/src/upload_queue.rs @@ -1,14 +1,14 @@ use anyhow::Error; use std::collections::HashSet; -use std::sync::{Mutex, Arc}; +use std::sync::{Arc, Mutex}; use futures::future::Future; use serde_json::json; use tokio::sync::{mpsc, oneshot}; -use pbs_datastore::index::IndexFile; -use pbs_datastore::fixed_index::FixedIndexReader; use pbs_client::*; +use pbs_datastore::fixed_index::FixedIndexReader; +use pbs_datastore::index::IndexFile; pub(crate) struct ChunkUploadInfo { pub digest: [u8; 32], @@ -23,14 +23,16 @@ pub(crate) struct UploadResult { pub bytes_written: u64, } -pub(crate) type UploadQueueSender = mpsc::Sender> + Send + Unpin>>; -type UploadQueueReceiver = mpsc::Receiver> + Send + Unpin>>; +pub(crate) type UploadQueueSender = + mpsc::Sender> + Send + Unpin>>; +type UploadQueueReceiver = + mpsc::Receiver> + Send + Unpin>>; pub(crate) type UploadResultReceiver = oneshot::Receiver>; type UploadResultSender = oneshot::Sender>; pub(crate) fn create_upload_queue( client: Arc, - known_chunks: Arc>>, + known_chunks: Arc>>, initial_index: Arc>, wid: u64, device_size: u64, @@ -39,18 +41,16 @@ pub(crate) fn create_upload_queue( let (upload_queue_tx, upload_queue_rx) = mpsc::channel(100); let (upload_result_tx, upload_result_rx) = oneshot::channel(); - tokio::spawn( - upload_handler( - client, - known_chunks, - initial_index, - wid, - device_size, - chunk_size, - upload_queue_rx, - upload_result_tx, - ) - ); + tokio::spawn(upload_handler( + client, + known_chunks, + initial_index, + wid, + device_size, + chunk_size, + upload_queue_rx, + upload_result_tx, + )); (upload_queue_tx, upload_result_rx) } @@ -67,7 +67,9 @@ async fn upload_chunk_list( digest_list.truncate(0); offset_list.truncate(0); - client.upload_put("fixed_index", None, "application/json", param_data).await?; + client + .upload_put("fixed_index", None, "application/json", param_data) + .await?; Ok(()) } @@ -75,7 +77,7 @@ async fn upload_chunk_list( #[allow(clippy::too_many_arguments, clippy::needless_range_loop)] async fn upload_handler( client: Arc, - known_chunks: Arc>>, + known_chunks: Arc>>, initial_index: Arc>, wid: u64, device_size: u64, @@ -89,7 +91,7 @@ async fn upload_handler( let mut digest_list = Vec::new(); let mut offset_list = Vec::new(); - let index_size = ((device_size + chunk_size -1)/chunk_size) as usize; + let index_size = ((device_size + chunk_size - 1) / chunk_size) as usize; let mut index = Vec::with_capacity(index_size); index.resize(index_size, [0u8; 32]); @@ -103,17 +105,23 @@ async fn upload_handler( while let Some(response_future) = upload_queue.recv().await { match response_future.await { - Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => { + Ok(ChunkUploadInfo { + digest, + offset, + size, + chunk_is_known, + }) => { let digest_str = hex::encode(&digest); //println!("upload_handler {:?} {}", digest, offset); - let pos = (offset/chunk_size) as usize; + let pos = (offset / chunk_size) as usize; index[pos] = digest; chunk_count += 1; bytes_written += size; - if !chunk_is_known { // register chunk as known + if !chunk_is_known { + // register chunk as known let mut known_chunks_guard = known_chunks.lock().unwrap(); known_chunks_guard.insert(digest); } @@ -127,12 +135,14 @@ async fn upload_handler( wid, &mut digest_list, &mut offset_list, - ).await { + ) + .await + { let _ = upload_result.send(Err(err)); return; } } - } + } Err(err) => { let _ = upload_result.send(Err(err)); return; @@ -141,12 +151,9 @@ async fn upload_handler( } if !digest_list.is_empty() { - if let Err(err) = upload_chunk_list( - Arc::clone(&client), - wid, - &mut digest_list, - &mut offset_list, - ).await { + if let Err(err) = + upload_chunk_list(Arc::clone(&client), wid, &mut digest_list, &mut offset_list).await + { let _ = upload_result.send(Err(err)); return; } @@ -158,5 +165,9 @@ async fn upload_handler( } let csum = csum.finish(); - let _ = upload_result.send(Ok(UploadResult { csum, chunk_count, bytes_written })); + let _ = upload_result.send(Ok(UploadResult { + csum, + chunk_count, + bytes_written, + })); }