Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2022-04-26 13:48:44 +02:00
parent bf18160d8f
commit 8fc821f8fd
9 changed files with 453 additions and 327 deletions

View File

@ -12,11 +12,7 @@ fn main() {
Some(ver) if !ver.is_empty() => ver, Some(ver) if !ver.is_empty() => ver,
_ => "UNKNOWN", _ => "UNKNOWN",
}; };
let version_string = format!( let version_string = format!("{} ({})", crate_ver, git_ver,);
"{} ({})",
crate_ver,
git_ver,
);
cbindgen::Builder::new() cbindgen::Builder::new()
.with_language(cbindgen::Language::C) .with_language(cbindgen::Language::C)

View File

@ -1,25 +1,25 @@
use anyhow::{format_err, bail, Error}; use anyhow::{bail, format_err, Error};
use std::collections::HashSet;
use std::sync::{Mutex, Arc};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::collections::HashSet;
use std::os::raw::c_int; use std::os::raw::c_int;
use std::sync::{Arc, Mutex};
use futures::future::{Future, Either, FutureExt}; use futures::future::{Either, Future, FutureExt};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use proxmox_sys::fs::file_get_contents;
use proxmox_async::runtime::get_runtime_with_builder; use proxmox_async::runtime::get_runtime_with_builder;
use proxmox_sys::fs::file_get_contents;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
use pbs_client::{BackupWriter, HttpClient, HttpClientOptions};
use pbs_config::key_config::{load_and_decrypt_key, rsa_encrypt_key_config, KeyConfig};
use pbs_datastore::{BackupDir, BackupManifest};
use pbs_tools::crypt_config::CryptConfig; use pbs_tools::crypt_config::CryptConfig;
use pbs_config::key_config::{KeyConfig, load_and_decrypt_key, rsa_encrypt_key_config};
use pbs_datastore::{BackupDir, BackupManifest,};
use pbs_client::{HttpClient, HttpClientOptions, BackupWriter};
use super::BackupSetup; use super::BackupSetup;
use crate::capi_types::*; use crate::capi_types::*;
use crate::registry::Registry;
use crate::commands::*; use crate::commands::*;
use crate::registry::Registry;
pub(crate) struct BackupTask { pub(crate) struct BackupTask {
setup: BackupSetup, setup: BackupSetup,
@ -32,13 +32,12 @@ pub(crate) struct BackupTask {
last_manifest: OnceCell<Arc<BackupManifest>>, last_manifest: OnceCell<Arc<BackupManifest>>,
manifest: Arc<Mutex<BackupManifest>>, manifest: Arc<Mutex<BackupManifest>>,
registry: Arc<Mutex<Registry<ImageUploadInfo>>>, registry: Arc<Mutex<Registry<ImageUploadInfo>>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
abort: tokio::sync::broadcast::Sender<()>, abort: tokio::sync::broadcast::Sender<()>,
aborted: OnceCell<String>, // set on abort, conatins abort reason aborted: OnceCell<String>, // set on abort, conatins abort reason
} }
impl BackupTask { impl BackupTask {
/// Create a new instance, using the specified Runtime /// Create a new instance, using the specified Runtime
/// ///
/// We keep a reference to the runtime - else the runtime can be /// We keep a reference to the runtime - else the runtime can be
@ -47,17 +46,14 @@ impl BackupTask {
setup: BackupSetup, setup: BackupSetup,
compress: bool, compress: bool,
crypt_mode: CryptMode, crypt_mode: CryptMode,
runtime: Arc<Runtime> runtime: Arc<Runtime>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let (crypt_config, rsa_encrypted_key) = match setup.keyfile { let (crypt_config, rsa_encrypted_key) = match setup.keyfile {
None => (None, None), None => (None, None),
Some(ref path) => { Some(ref path) => {
let (key, created, _) = load_and_decrypt_key(path, & || { let (key, created, _) = load_and_decrypt_key(path, &|| match setup.key_password {
match setup.key_password { Some(ref key_password) => Ok(key_password.as_bytes().to_vec()),
Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), None => bail!("no key_password specified"),
None => bail!("no key_password specified"),
}
})?; })?;
let rsa_encrypted_key = match setup.master_keyfile { let rsa_encrypted_key = match setup.master_keyfile {
Some(ref master_keyfile) => { Some(ref master_keyfile) => {
@ -68,7 +64,7 @@ impl BackupTask {
key_config.created = created; // keep original value key_config.created = created; // keep original value
Some(rsa_encrypt_key_config(rsa, &key_config)?) Some(rsa_encrypt_key_config(rsa, &key_config)?)
}, }
None => None, None => None,
}; };
(Some(Arc::new(CryptConfig::new(key)?)), rsa_encrypted_key) (Some(Arc::new(CryptConfig::new(key)?)), rsa_encrypted_key)
@ -96,7 +92,7 @@ impl BackupTask {
known_chunks, known_chunks,
writer: OnceCell::new(), writer: OnceCell::new(),
last_manifest: OnceCell::new(), last_manifest: OnceCell::new(),
aborted: OnceCell::new() aborted: OnceCell::new(),
}) })
} }
@ -132,29 +128,43 @@ impl BackupTask {
} }
pub async fn connect(&self) -> Result<c_int, Error> { pub async fn connect(&self) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = async { let command_future = async {
let options = HttpClientOptions::new_non_interactive( let options = HttpClientOptions::new_non_interactive(
self.setup.password.clone(), self.setup.password.clone(),
self.setup.fingerprint.clone(), self.setup.fingerprint.clone(),
); );
let http = HttpClient::new(&self.setup.host, self.setup.port, &self.setup.auth_id, options)?; let http = HttpClient::new(
&self.setup.host,
self.setup.port,
&self.setup.auth_id,
options,
)?;
let writer = BackupWriter::start( let writer = BackupWriter::start(
http, self.crypt_config.clone(), &self.setup.store, "vm", &self.setup.backup_id, http,
self.setup.backup_time, false, false).await?; self.crypt_config.clone(),
&self.setup.store,
"vm",
&self.setup.backup_id,
self.setup.backup_time,
false,
false,
)
.await?;
let last_manifest = writer.download_previous_manifest().await; let last_manifest = writer.download_previous_manifest().await;
let mut result = 0; let mut result = 0;
if let Ok(last_manifest) = last_manifest { if let Ok(last_manifest) = last_manifest {
result = 1; result = 1;
self.last_manifest.set(Arc::new(last_manifest)) self.last_manifest
.set(Arc::new(last_manifest))
.map_err(|_| format_err!("already connected!"))?; .map_err(|_| format_err!("already connected!"))?;
} }
self.writer.set(writer) self.writer
.set(writer)
.map_err(|_| format_err!("already connected!"))?; .map_err(|_| format_err!("already connected!"))?;
Ok(result) Ok(result)
@ -171,12 +181,7 @@ impl BackupTask {
.ok_or_else(|| format_err!("not connected")) .ok_or_else(|| format_err!("not connected"))
} }
pub async fn add_config( pub async fn add_config(&self, name: String, data: Vec<u8>) -> Result<c_int, Error> {
&self,
name: String,
data: Vec<u8>,
) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = add_config( let command_future = add_config(
@ -199,12 +204,15 @@ impl BackupTask {
offset: u64, offset: u64,
size: u64, size: u64,
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = write_data( let command_future = write_data(
self.need_writer()?, self.need_writer()?,
if self.crypt_mode == CryptMode::Encrypt { self.crypt_config.clone() } else { None }, if self.crypt_mode == CryptMode::Encrypt {
self.crypt_config.clone()
} else {
None
},
Arc::clone(&self.registry), Arc::clone(&self.registry),
Arc::clone(&self.known_chunks), Arc::clone(&self.known_chunks),
dev_id, dev_id,
@ -223,17 +231,17 @@ impl BackupTask {
self.last_manifest.get().map(Arc::clone) self.last_manifest.get().map(Arc::clone)
} }
pub fn check_incremental( pub fn check_incremental(&self, device_name: String, size: u64) -> bool {
&self,
device_name: String,
size: u64,
) -> bool {
match self.last_manifest() { match self.last_manifest() {
Some(ref manifest) => { Some(ref manifest) => {
check_last_incremental_csum(Arc::clone(manifest), &device_name, size) check_last_incremental_csum(Arc::clone(manifest), &device_name, size)
&& check_last_encryption_mode(Arc::clone(manifest), &device_name, self.crypt_mode) && check_last_encryption_mode(
Arc::clone(manifest),
&device_name,
self.crypt_mode,
)
&& check_last_encryption_key(self.crypt_config.clone()) && check_last_encryption_key(self.crypt_config.clone())
}, }
None => false, None => false,
} }
} }
@ -244,7 +252,6 @@ impl BackupTask {
size: u64, size: u64,
incremental: bool, incremental: bool,
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = register_image( let command_future = register_image(
@ -265,7 +272,6 @@ impl BackupTask {
} }
pub async fn close_image(&self, dev_id: u8) -> Result<c_int, Error> { pub async fn close_image(&self, dev_id: u8) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = close_image( let command_future = close_image(
@ -281,7 +287,6 @@ impl BackupTask {
} }
pub async fn finish(&self) -> Result<c_int, Error> { pub async fn finish(&self) -> Result<c_int, Error> {
self.check_aborted()?; self.check_aborted()?;
let command_future = finish_backup( let command_future = finish_backup(
@ -294,21 +299,16 @@ impl BackupTask {
let mut abort_rx = self.abort.subscribe(); let mut abort_rx = self.abort.subscribe();
abortable_command(command_future, abort_rx.recv()).await abortable_command(command_future, abort_rx.recv()).await
} }
} }
fn abortable_command<'a, F: 'a + Send + Future<Output=Result<c_int, Error>>>( fn abortable_command<'a, F: 'a + Send + Future<Output = Result<c_int, Error>>>(
command_future: F, command_future: F,
abort_future: impl 'a + Send + Future<Output=Result<(), tokio::sync::broadcast::error::RecvError>>, abort_future: impl 'a + Send + Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>>,
) -> impl 'a + Future<Output = Result<c_int, Error>> { ) -> impl 'a + Future<Output = Result<c_int, Error>> {
futures::future::select(command_future.boxed(), abort_future.boxed()).map(move |either| {
futures::future::select(command_future.boxed(), abort_future.boxed()) match either {
.map(move |either| { Either::Left((result, _)) => result,
match either { Either::Right(_) => bail!("command aborted"),
Either::Left((result, _)) => { }
result })
}
Either::Right(_) => bail!("command aborted"),
}
})
} }

View File

@ -1,6 +1,6 @@
use anyhow::Error; use anyhow::Error;
use std::os::raw::{c_char, c_void, c_int};
use std::ffi::CString; use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
pub(crate) struct CallbackPointers { pub(crate) struct CallbackPointers {
pub callback: extern "C" fn(*mut c_void), pub callback: extern "C" fn(*mut c_void),
@ -11,7 +11,6 @@ pub(crate) struct CallbackPointers {
unsafe impl std::marker::Send for CallbackPointers {} unsafe impl std::marker::Send for CallbackPointers {}
impl CallbackPointers { impl CallbackPointers {
pub fn send_result(self, result: Result<c_int, Error>) { pub fn send_result(self, result: Result<c_int, Error>) {
match result { match result {
Ok(ret) => { Ok(ret) => {
@ -38,7 +37,7 @@ impl CallbackPointers {
} }
} }
pub(crate) struct DataPointer (pub *const u8); pub(crate) struct DataPointer(pub *const u8);
unsafe impl std::marker::Send for DataPointer {} unsafe impl std::marker::Send for DataPointer {}
/// Opaque handle for restore jobs /// Opaque handle for restore jobs

View File

@ -1,25 +1,27 @@
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use std::collections::{HashSet, HashMap}; use std::collections::{HashMap, HashSet};
use std::sync::{Mutex, Arc};
use std::os::raw::c_int; use std::os::raw::c_int;
use std::sync::{Arc, Mutex};
use futures::future::{Future, TryFutureExt}; use futures::future::{Future, TryFutureExt};
use serde_json::json; use serde_json::json;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
use pbs_tools::crypt_config::CryptConfig;
use pbs_datastore::index::IndexFile;
use pbs_datastore::data_blob::DataChunkBuilder;
use pbs_datastore::manifest::{BackupManifest, MANIFEST_BLOB_NAME, ENCRYPTED_KEY_BLOB_NAME};
use pbs_client::{BackupWriter, H2Client, UploadOptions}; use pbs_client::{BackupWriter, H2Client, UploadOptions};
use pbs_datastore::data_blob::DataChunkBuilder;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, ENCRYPTED_KEY_BLOB_NAME, MANIFEST_BLOB_NAME};
use pbs_tools::crypt_config::CryptConfig;
use crate::registry::Registry;
use crate::capi_types::DataPointer; use crate::capi_types::DataPointer;
use crate::upload_queue::{ChunkUploadInfo, UploadQueueSender, UploadResultReceiver, create_upload_queue}; use crate::registry::Registry;
use crate::upload_queue::{
create_upload_queue, ChunkUploadInfo, UploadQueueSender, UploadResultReceiver,
};
use lazy_static::lazy_static; use lazy_static::lazy_static;
lazy_static!{ lazy_static! {
// Note: Any state stored here that needs to be sent along with migration // Note: Any state stored here that needs to be sent along with migration
// needs to be specified in (de)serialize_state as well! // needs to be specified in (de)serialize_state as well!
@ -41,7 +43,6 @@ pub struct ImageUploadInfo {
upload_result: Option<UploadResultReceiver>, upload_result: Option<UploadResultReceiver>,
} }
pub(crate) fn serialize_state() -> Vec<u8> { pub(crate) fn serialize_state() -> Vec<u8> {
let prev_csums = &*PREVIOUS_CSUMS.lock().unwrap(); let prev_csums = &*PREVIOUS_CSUMS.lock().unwrap();
let prev_key_fingerprint = &*PREVIOUS_KEY_FINGERPRINT.lock().unwrap(); let prev_key_fingerprint = &*PREVIOUS_KEY_FINGERPRINT.lock().unwrap();
@ -57,15 +58,13 @@ pub(crate) fn deserialize_state(data: &[u8]) -> Result<(), Error> {
Ok(()) Ok(())
} }
// Note: We alway register/upload a chunk containing zeros // Note: We alway register/upload a chunk containing zeros
async fn register_zero_chunk( async fn register_zero_chunk(
client: Arc<BackupWriter>, client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
chunk_size: usize, chunk_size: usize,
wid: u64, wid: u64,
) -> Result<[u8;32], Error> { ) -> Result<[u8; 32], Error> {
let (chunk, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk( let (chunk, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk(
crypt_config.as_ref().map(Arc::as_ref), crypt_config.as_ref().map(Arc::as_ref),
chunk_size, chunk_size,
@ -80,7 +79,14 @@ async fn register_zero_chunk(
"encoded-size": chunk_data.len(), "encoded-size": chunk_data.len(),
}); });
client.upload_post("fixed_chunk", Some(param), "application/octet-stream", chunk_data).await?; client
.upload_post(
"fixed_chunk",
Some(param),
"application/octet-stream",
chunk_data,
)
.await?;
Ok(zero_chunk_digest) Ok(zero_chunk_digest)
} }
@ -103,7 +109,9 @@ pub(crate) async fn add_config(
..UploadOptions::default() ..UploadOptions::default()
}; };
let stats = client.upload_blob_from_data(data, &blob_name, options).await?; let stats = client
.upload_blob_from_data(data, &blob_name, options)
.await?;
let mut guard = manifest.lock().unwrap(); let mut guard = manifest.lock().unwrap();
guard.add_file(blob_name, stats.size, stats.csum, crypt_mode)?; guard.add_file(blob_name, stats.size, stats.csum, crypt_mode)?;
@ -115,14 +123,13 @@ fn archive_name(device_name: &str) -> String {
format!("{}.img.fidx", device_name) format!("{}.img.fidx", device_name)
} }
const CRYPT_CONFIG_HASH_INPUT:&[u8] = b"this is just a static string to protect against key changes"; const CRYPT_CONFIG_HASH_INPUT: &[u8] =
b"this is just a static string to protect against key changes";
/// Create an identifying digest for the crypt config /// Create an identifying digest for the crypt config
/// legacy version for VMs freshly migrated from old version /// legacy version for VMs freshly migrated from old version
/// TODO: remove in PVE 7.0 /// TODO: remove in PVE 7.0
pub(crate) fn crypt_config_digest( pub(crate) fn crypt_config_digest(config: Arc<CryptConfig>) -> [u8; 32] {
config: Arc<CryptConfig>,
) -> [u8;32] {
config.compute_digest(CRYPT_CONFIG_HASH_INPUT) config.compute_digest(CRYPT_CONFIG_HASH_INPUT)
} }
@ -131,9 +138,10 @@ pub(crate) fn check_last_incremental_csum(
device_name: &str, device_name: &str,
device_size: u64, device_size: u64,
) -> bool { ) -> bool {
match PREVIOUS_CSUMS.lock().unwrap().get(device_name) { match PREVIOUS_CSUMS.lock().unwrap().get(device_name) {
Some(csum) => manifest.verify_file(&archive_name(device_name), csum, device_size).is_ok(), Some(csum) => manifest
.verify_file(&archive_name(device_name), csum, device_size)
.is_ok(),
None => false, None => false,
} }
} }
@ -144,29 +152,25 @@ pub(crate) fn check_last_encryption_mode(
crypt_mode: CryptMode, crypt_mode: CryptMode,
) -> bool { ) -> bool {
match manifest.lookup_file_info(&archive_name(device_name)) { match manifest.lookup_file_info(&archive_name(device_name)) {
Ok(file) => { Ok(file) => match (file.crypt_mode, crypt_mode) {
match (file.crypt_mode, crypt_mode) { (CryptMode::Encrypt, CryptMode::Encrypt) => true,
(CryptMode::Encrypt, CryptMode::Encrypt) => true, (CryptMode::Encrypt, _) => false,
(CryptMode::Encrypt, _) => false, (CryptMode::SignOnly, CryptMode::Encrypt) => false,
(CryptMode::SignOnly, CryptMode::Encrypt) => false, (CryptMode::SignOnly, _) => true,
(CryptMode::SignOnly, _) => true, (CryptMode::None, CryptMode::Encrypt) => false,
(CryptMode::None, CryptMode::Encrypt) => false, (CryptMode::None, _) => true,
(CryptMode::None, _) => true,
}
}, },
_ => false, _ => false,
} }
} }
pub(crate) fn check_last_encryption_key( pub(crate) fn check_last_encryption_key(config: Option<Arc<CryptConfig>>) -> bool {
config: Option<Arc<CryptConfig>>,
) -> bool {
let fingerprint_guard = PREVIOUS_KEY_FINGERPRINT.lock().unwrap(); let fingerprint_guard = PREVIOUS_KEY_FINGERPRINT.lock().unwrap();
match (*fingerprint_guard, config) { match (*fingerprint_guard, config) {
(Some(last_fingerprint), Some(current_config)) => { (Some(last_fingerprint), Some(current_config)) => {
current_config.fingerprint() == last_fingerprint current_config.fingerprint() == last_fingerprint
|| crypt_config_digest(current_config) == last_fingerprint || crypt_config_digest(current_config) == last_fingerprint
}, }
(None, None) => true, (None, None) => true,
_ => false, _ => false,
} }
@ -179,24 +183,26 @@ pub(crate) async fn register_image(
crypt_mode: CryptMode, crypt_mode: CryptMode,
manifest: Option<Arc<BackupManifest>>, manifest: Option<Arc<BackupManifest>>,
registry: Arc<Mutex<Registry<ImageUploadInfo>>>, registry: Arc<Mutex<Registry<ImageUploadInfo>>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
device_name: String, device_name: String,
device_size: u64, device_size: u64,
chunk_size: u64, chunk_size: u64,
incremental: bool, incremental: bool,
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
let archive_name = archive_name(&device_name); let archive_name = archive_name(&device_name);
let index = match manifest { let index = match manifest {
Some(manifest) => { Some(manifest) => {
match client.download_previous_fixed_index(&archive_name, &manifest, Arc::clone(&known_chunks)).await { match client
.download_previous_fixed_index(&archive_name, &manifest, Arc::clone(&known_chunks))
.await
{
Ok(index) => Some(index), Ok(index) => Some(index),
// not having a previous index is not fatal, so ignore errors // not having a previous index is not fatal, so ignore errors
Err(_) => None Err(_) => None,
} }
}, }
None => None None => None,
}; };
let mut param = json!({ "archive-name": archive_name , "size": device_size }); let mut param = json!({ "archive-name": archive_name , "size": device_size });
@ -210,7 +216,7 @@ pub(crate) async fn register_image(
match index { match index {
Some(index) => { Some(index) => {
let index_size = ((device_size + chunk_size -1)/chunk_size) as usize; let index_size = ((device_size + chunk_size - 1) / chunk_size) as usize;
if index_size != index.index_count() { if index_size != index.index_count() {
bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name); bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name);
} }
@ -219,23 +225,31 @@ pub(crate) async fn register_image(
} }
initial_index = Arc::new(Some(index)); initial_index = Arc::new(Some(index));
}, }
None => bail!("no previous backup found, cannot do incremental backup") None => bail!("no previous backup found, cannot do incremental backup"),
} }
} else { } else {
bail!("no previous backups in this session, cannot do incremental backup"); bail!("no previous backups in this session, cannot do incremental backup");
} }
} }
let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap(); let wid = client
.post("fixed_index", Some(param))
.await?
.as_u64()
.unwrap();
let zero_chunk_digest = register_zero_chunk( let zero_chunk_digest = register_zero_chunk(
Arc::clone(&client), Arc::clone(&client),
if crypt_mode == CryptMode::Encrypt { crypt_config } else { None }, if crypt_mode == CryptMode::Encrypt {
crypt_config
} else {
None
},
chunk_size as usize, chunk_size as usize,
wid, wid,
).await?; )
.await?;
let (upload_queue, upload_result) = create_upload_queue( let (upload_queue, upload_result) = create_upload_queue(
Arc::clone(&client), Arc::clone(&client),
@ -253,7 +267,7 @@ pub(crate) async fn register_image(
device_size, device_size,
upload_queue: Some(upload_queue), upload_queue: Some(upload_queue),
upload_result: Some(upload_result), upload_result: Some(upload_result),
}; };
let mut guard = registry.lock().unwrap(); let mut guard = registry.lock().unwrap();
let dev_id = guard.register(info)?; let dev_id = guard.register(info)?;
@ -268,7 +282,6 @@ pub(crate) async fn close_image(
dev_id: u8, dev_id: u8,
crypt_mode: CryptMode, crypt_mode: CryptMode,
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
//println!("close image {}", dev_id); //println!("close image {}", dev_id);
let (wid, upload_result, device_name, device_size) = { let (wid, upload_result, device_name, device_size) = {
@ -277,17 +290,22 @@ pub(crate) async fn close_image(
info.upload_queue.take(); // close info.upload_queue.take(); // close
(info.wid, info.upload_result.take(), info.device_name.clone(), info.device_size) (
info.wid,
info.upload_result.take(),
info.device_name.clone(),
info.device_size,
)
}; };
let upload_result = match upload_result { let upload_result = match upload_result {
Some(upload_result) => { Some(upload_result) => match upload_result.await? {
match upload_result.await? { Ok(res) => res,
Ok(res) => res, Err(err) => bail!("close_image: upload error: {}", err),
Err(err) => bail!("close_image: upload error: {}", err), },
} None => {
bail!("close_image: unknown error because upload result channel was already closed")
} }
None => bail!("close_image: unknown error because upload result channel was already closed"),
}; };
let csum = hex::encode(&upload_result.csum); let csum = hex::encode(&upload_result.csum);
@ -306,9 +324,13 @@ pub(crate) async fn close_image(
let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap(); let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap();
prev_csum_guard.insert(info.device_name.clone(), upload_result.csum); prev_csum_guard.insert(info.device_name.clone(), upload_result.csum);
let mut guard = manifest.lock().unwrap(); let mut guard = manifest.lock().unwrap();
guard.add_file(format!("{}.img.fidx", device_name), device_size, upload_result.csum, crypt_mode)?; guard.add_file(
format!("{}.img.fidx", device_name),
device_size,
upload_result.csum,
crypt_mode,
)?;
Ok(0) Ok(0)
} }
@ -318,22 +340,20 @@ pub(crate) async fn write_data(
client: Arc<BackupWriter>, client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<Registry<ImageUploadInfo>>>, registry: Arc<Mutex<Registry<ImageUploadInfo>>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
dev_id: u8, dev_id: u8,
data: DataPointer, data: DataPointer,
offset: u64, offset: u64,
size: u64, // actual data size size: u64, // actual data size
chunk_size: u64, // expected data size chunk_size: u64, // expected data size
compress: bool, compress: bool,
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
//println!("dev {}: write {} {}", dev_id, offset, size); //println!("dev {}: write {} {}", dev_id, offset, size);
let (wid, mut upload_queue, zero_chunk_digest) = { let (wid, mut upload_queue, zero_chunk_digest) = {
let mut guard = registry.lock().unwrap(); let mut guard = registry.lock().unwrap();
let info = guard.lookup(dev_id)?; let info = guard.lookup(dev_id)?;
(info.wid, info.upload_queue.clone(), info.zero_chunk_digest) (info.wid, info.upload_queue.clone(), info.zero_chunk_digest)
}; };
@ -344,7 +364,12 @@ pub(crate) async fn write_data(
if size != chunk_size { if size != chunk_size {
bail!("write_data: got invalid null chunk"); bail!("write_data: got invalid null chunk");
} }
let upload_info = ChunkUploadInfo { digest: zero_chunk_digest, offset, size, chunk_is_known: true }; let upload_info = ChunkUploadInfo {
digest: zero_chunk_digest,
offset,
size,
chunk_is_known: true,
};
reused = true; reused = true;
Box::new(futures::future::ok(upload_info)) Box::new(futures::future::ok(upload_info))
} else { } else {
@ -364,10 +389,15 @@ pub(crate) async fn write_data(
}; };
if chunk_is_known { if chunk_is_known {
let upload_info = ChunkUploadInfo { digest: *digest, offset, size, chunk_is_known: true }; let upload_info = ChunkUploadInfo {
digest: *digest,
offset,
size,
chunk_is_known: true,
};
reused = true; reused = true;
Box::new(futures::future::ok(upload_info)) Box::new(futures::future::ok(upload_info))
} else { } else {
let (chunk, digest) = chunk_builder.build()?; let (chunk, digest) = chunk_builder.build()?;
let digest_str = hex::encode(&digest); let digest_str = hex::encode(&digest);
let chunk_data = chunk.into_inner(); let chunk_data = chunk.into_inner();
@ -380,20 +410,25 @@ pub(crate) async fn write_data(
}); });
// Phase 1: send data // Phase 1: send data
let response_future = client.send_upload_request( let response_future = client
"POST", .send_upload_request(
"fixed_chunk", "POST",
Some(param), "fixed_chunk",
"application/octet-stream", Some(param),
chunk_data, "application/octet-stream",
).await?; chunk_data,
)
.await?;
// create response future (run that in other task) // create response future (run that in other task)
let upload_future = response_future let upload_future = response_future
.map_err(Error::from) .map_err(Error::from)
.and_then(H2Client::h2api_response) .and_then(H2Client::h2api_response)
.map_ok(move |_| { .map_ok(move |_| ChunkUploadInfo {
ChunkUploadInfo { digest, offset, size, chunk_is_known: false } digest,
offset,
size,
chunk_is_known: false,
}) })
.map_err(|err| format_err!("pipelined request failed: {}", err)); .map_err(|err| format_err!("pipelined request failed: {}", err));
@ -441,28 +476,35 @@ pub(crate) async fn finish_backup(
) -> Result<c_int, Error> { ) -> Result<c_int, Error> {
if let Some(rsa_encrypted_key) = rsa_encrypted_key { if let Some(rsa_encrypted_key) = rsa_encrypted_key {
let target = ENCRYPTED_KEY_BLOB_NAME; let target = ENCRYPTED_KEY_BLOB_NAME;
let options = UploadOptions { compress: false, encrypt: false, ..UploadOptions::default() }; let options = UploadOptions {
compress: false,
encrypt: false,
..UploadOptions::default()
};
let stats = client let stats = client
.upload_blob_from_data(rsa_encrypted_key, target, options) .upload_blob_from_data(rsa_encrypted_key, target, options)
.await?; .await?;
manifest.lock().unwrap().add_file(target.to_string(), stats.size, stats.csum, CryptMode::Encrypt)?; manifest.lock().unwrap().add_file(
target.to_string(),
stats.size,
stats.csum,
CryptMode::Encrypt,
)?;
}; };
let manifest = { let manifest = {
let guard = manifest.lock().unwrap(); let guard = manifest.lock().unwrap();
guard.to_string(crypt_config.as_ref().map(Arc::as_ref)) guard
.to_string(crypt_config.as_ref().map(Arc::as_ref))
.map_err(|err| format_err!("unable to format manifest - {}", err))? .map_err(|err| format_err!("unable to format manifest - {}", err))?
}; };
{ {
let key_fingerprint = match crypt_config { let key_fingerprint = match crypt_config {
Some(current_config) => { Some(current_config) => {
let fp = current_config let fp = current_config.fingerprint().to_owned();
.fingerprint()
.to_owned();
Some(fp) Some(fp)
}, }
None => None, None => None,
}; };

View File

@ -2,22 +2,22 @@
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use std::ffi::CString; use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_long, c_uchar, c_void};
use std::ptr; use std::ptr;
use std::os::raw::{c_uchar, c_char, c_int, c_void, c_long}; use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex, Condvar};
use proxmox_lang::try_block; use proxmox_lang::try_block;
use pbs_api_types::{Authid, CryptMode}; use pbs_api_types::{Authid, CryptMode};
use pbs_datastore::BackupDir;
use pbs_client::BackupRepository; use pbs_client::BackupRepository;
use pbs_datastore::BackupDir;
mod capi_types; mod capi_types;
use capi_types::*; use capi_types::*;
mod commands;
mod registry; mod registry;
mod upload_queue; mod upload_queue;
mod commands;
mod backup; mod backup;
use backup::*; use backup::*;
@ -25,16 +25,14 @@ use backup::*;
mod restore; mod restore;
use restore::*; use restore::*;
mod tools;
mod shared_cache; mod shared_cache;
mod tools;
pub const PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE: u64 = 1024*1024*4; pub const PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024 * 4;
use lazy_static::lazy_static; use lazy_static::lazy_static;
lazy_static!{ lazy_static! {
static ref VERSION_CSTR: CString = { static ref VERSION_CSTR: CString = { CString::new(env!("PBS_LIB_VERSION")).unwrap() };
CString::new(env!("PBS_LIB_VERSION")).unwrap()
};
} }
/// Return a read-only pointer to a string containing the version of the library. /// Return a read-only pointer to a string containing the version of the library.
@ -50,9 +48,11 @@ pub extern "C" fn proxmox_backup_qemu_version() -> *const c_char {
/// and free the allocated memory. /// and free the allocated memory.
#[no_mangle] #[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_free_error(ptr: * mut c_char) { pub extern "C" fn proxmox_backup_free_error(ptr: *mut c_char) {
if !ptr.is_null() { if !ptr.is_null() {
unsafe { CString::from_raw(ptr); } unsafe {
CString::from_raw(ptr);
}
} }
} }
@ -63,24 +63,28 @@ fn convert_error_to_cstring(err: String) -> CString {
Err(err) => { Err(err) => {
eprintln!("got error containung 0 bytes: {}", err); eprintln!("got error containung 0 bytes: {}", err);
CString::new("failed to convert error message containing 0 bytes").unwrap() CString::new("failed to convert error message containing 0 bytes").unwrap()
}, }
} }
} }
macro_rules! raise_error_null { macro_rules! raise_error_null {
($error:ident, $err:expr) => {{ ($error:ident, $err:expr) => {{
let errmsg = convert_error_to_cstring($err.to_string()); let errmsg = convert_error_to_cstring($err.to_string());
unsafe { *$error = errmsg.into_raw(); } unsafe {
*$error = errmsg.into_raw();
}
return ptr::null_mut(); return ptr::null_mut();
}} }};
} }
macro_rules! raise_error_int { macro_rules! raise_error_int {
($error:ident, $err:expr) => {{ ($error:ident, $err:expr) => {{
let errmsg = convert_error_to_cstring($err.to_string()); let errmsg = convert_error_to_cstring($err.to_string());
unsafe { *$error = errmsg.into_raw(); } unsafe {
*$error = errmsg.into_raw();
}
return -1; return -1;
}} }};
} }
macro_rules! param_not_null { macro_rules! param_not_null {
@ -90,7 +94,7 @@ macro_rules! param_not_null {
$callback_info.send_result(result); $callback_info.send_result(result);
return; return;
} }
}} }};
} }
/// Returns the text presentation (relative path) for a backup snapshot /// Returns the text presentation (relative path) for a backup snapshot
@ -103,9 +107,8 @@ pub extern "C" fn proxmox_backup_snapshot_string(
backup_type: *const c_char, backup_type: *const c_char,
backup_id: *const c_char, backup_id: *const c_char,
backup_time: i64, backup_time: i64,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> *const c_char { ) -> *const c_char {
let snapshot: Result<CString, Error> = try_block!({ let snapshot: Result<CString, Error> = try_block!({
let backup_type: String = tools::utf8_c_string_lossy(backup_type) let backup_type: String = tools::utf8_c_string_lossy(backup_type)
.ok_or_else(|| format_err!("backup_type must not be NULL"))?; .ok_or_else(|| format_err!("backup_type must not be NULL"))?;
@ -118,14 +121,11 @@ pub extern "C" fn proxmox_backup_snapshot_string(
}); });
match snapshot { match snapshot {
Ok(snapshot) => { Ok(snapshot) => unsafe { libc::strdup(snapshot.as_ptr()) },
unsafe { libc::strdup(snapshot.as_ptr()) }
}
Err(err) => raise_error_null!(error, err), Err(err) => raise_error_null!(error, err),
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct BackupSetup { pub(crate) struct BackupSetup {
pub host: String, pub host: String,
@ -150,7 +150,6 @@ struct GotResultCondition {
} }
impl GotResultCondition { impl GotResultCondition {
#[allow(clippy::mutex_atomic)] #[allow(clippy::mutex_atomic)]
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -186,10 +185,8 @@ impl GotResultCondition {
#[no_mangle] #[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
extern "C" fn wakeup_callback( extern "C" fn wakeup_callback(callback_data: *mut c_void) {
callback_data: *mut c_void, let callback_data = unsafe { &mut *(callback_data as *mut GotResultCondition) };
) {
let callback_data = unsafe { &mut *( callback_data as * mut GotResultCondition) };
#[allow(clippy::mutex_atomic)] #[allow(clippy::mutex_atomic)]
let mut done = callback_data.lock.lock().unwrap(); let mut done = callback_data.lock.lock().unwrap();
*done = true; *done = true;
@ -197,7 +194,6 @@ impl GotResultCondition {
} }
} }
/// Create a new instance /// Create a new instance
/// ///
/// Uses `PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE` if `chunk_size` is zero. /// Uses `PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE` if `chunk_size` is zero.
@ -215,9 +211,8 @@ pub extern "C" fn proxmox_backup_new(
compress: bool, compress: bool,
encrypt: bool, encrypt: bool,
fingerprint: *const c_char, fingerprint: *const c_char,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> *mut ProxmoxBackupHandle { ) -> *mut ProxmoxBackupHandle {
let task: Result<_, Error> = try_block!({ let task: Result<_, Error> = try_block!({
let repo: BackupRepository = tools::utf8_c_string(repo)? let repo: BackupRepository = tools::utf8_c_string(repo)?
.ok_or_else(|| format_err!("repo must not be NULL"))? .ok_or_else(|| format_err!("repo must not be NULL"))?
@ -238,7 +233,11 @@ pub extern "C" fn proxmox_backup_new(
} }
let crypt_mode = if keyfile.is_some() { let crypt_mode = if keyfile.is_some() {
if encrypt { CryptMode::Encrypt } else { CryptMode::SignOnly } if encrypt {
CryptMode::Encrypt
} else {
CryptMode::SignOnly
}
} else { } else {
CryptMode::None CryptMode::None
}; };
@ -248,7 +247,11 @@ pub extern "C" fn proxmox_backup_new(
port: repo.port(), port: repo.port(),
auth_id: repo.auth_id().to_owned(), auth_id: repo.auth_id().to_owned(),
store: repo.store().to_owned(), store: repo.store().to_owned(),
chunk_size: if chunk_size > 0 { chunk_size } else { PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE }, chunk_size: if chunk_size > 0 {
chunk_size
} else {
PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE
},
backup_type: String::from("vm"), backup_type: String::from("vm"),
backup_id, backup_id,
password, password,
@ -265,14 +268,14 @@ pub extern "C" fn proxmox_backup_new(
match task { match task {
Ok(task) => { Ok(task) => {
let boxed_task = Box::new(Arc::new(task)); let boxed_task = Box::new(Arc::new(task));
Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle Box::into_raw(boxed_task) as *mut ProxmoxBackupHandle
} }
Err(err) => raise_error_null!(error, err), Err(err) => raise_error_null!(error, err),
} }
} }
fn backup_handle_to_task(handle: *mut ProxmoxBackupHandle) -> Arc<BackupTask> { fn backup_handle_to_task(handle: *mut ProxmoxBackupHandle) -> Arc<BackupTask> {
let task = unsafe { & *(handle as *const Arc<BackupTask>) }; let task = unsafe { &*(handle as *const Arc<BackupTask>) };
// increase reference count while we use it inside rust // increase reference count while we use it inside rust
Arc::clone(task) Arc::clone(task)
} }
@ -324,7 +327,12 @@ pub extern "C" fn proxmox_backup_connect_async(
error: *mut *mut c_char, error: *mut *mut c_char,
) { ) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
task.runtime().spawn(async move { task.runtime().spawn(async move {
let result = task.connect().await; let result = task.connect().await;
@ -339,13 +347,10 @@ pub extern "C" fn proxmox_backup_connect_async(
/// allocated memory. /// allocated memory.
#[no_mangle] #[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_abort( pub extern "C" fn proxmox_backup_abort(handle: *mut ProxmoxBackupHandle, reason: *const c_char) {
handle: *mut ProxmoxBackupHandle,
reason: *const c_char,
) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let reason = tools::utf8_c_string_lossy(reason) let reason =
.unwrap_or_else(|| "no reason (NULL)".to_string()); tools::utf8_c_string_lossy(reason).unwrap_or_else(|| "no reason (NULL)".to_string());
task.abort(reason); task.abort(reason);
} }
@ -362,11 +367,19 @@ pub extern "C" fn proxmox_backup_check_incremental(
) -> c_int { ) -> c_int {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
if device_name.is_null() { return 0; } if device_name.is_null() {
return 0;
}
match tools::utf8_c_string_lossy(device_name) { match tools::utf8_c_string_lossy(device_name) {
None => 0, None => 0,
Some(device_name) => if task.check_incremental(device_name, size) { 1 } else { 0 }, Some(device_name) => {
if task.check_incremental(device_name, size) {
1
} else {
0
}
}
} }
} }
@ -378,7 +391,7 @@ pub extern "C" fn proxmox_backup_register_image(
device_name: *const c_char, // expect utf8 here device_name: *const c_char, // expect utf8 here
size: u64, size: u64,
incremental: bool, incremental: bool,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -387,7 +400,10 @@ pub extern "C" fn proxmox_backup_register_image(
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_backup_register_image_async( proxmox_backup_register_image_async(
handle, device_name, size, incremental, handle,
device_name,
size,
incremental,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -414,10 +430,15 @@ pub extern "C" fn proxmox_backup_register_image_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
param_not_null!(device_name, callback_info); param_not_null!(device_name, callback_info);
@ -437,7 +458,7 @@ pub extern "C" fn proxmox_backup_add_config(
name: *const c_char, // expect utf8 here name: *const c_char, // expect utf8 here
data: *const u8, data: *const u8,
size: u64, size: u64,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -446,7 +467,10 @@ pub extern "C" fn proxmox_backup_add_config(
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_backup_add_config_async( proxmox_backup_add_config_async(
handle, name, data, size, handle,
name,
data,
size,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -471,11 +495,16 @@ pub extern "C" fn proxmox_backup_add_config_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
param_not_null!(name, callback_info); param_not_null!(name, callback_info);
let name = unsafe { tools::utf8_c_string_lossy_non_null(name) }; let name = unsafe { tools::utf8_c_string_lossy_non_null(name) };
@ -508,7 +537,7 @@ pub extern "C" fn proxmox_backup_write_data(
data: *const u8, data: *const u8,
offset: u64, offset: u64,
size: u64, size: u64,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -517,7 +546,11 @@ pub extern "C" fn proxmox_backup_write_data(
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_backup_write_data_async( proxmox_backup_write_data_async(
handle, dev_id, data, offset, size, handle,
dev_id,
data,
offset,
size,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -554,10 +587,15 @@ pub extern "C" fn proxmox_backup_write_data_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
let data = DataPointer(data); let data = DataPointer(data);
@ -573,7 +611,7 @@ pub extern "C" fn proxmox_backup_write_data_async(
pub extern "C" fn proxmox_backup_close_image( pub extern "C" fn proxmox_backup_close_image(
handle: *mut ProxmoxBackupHandle, handle: *mut ProxmoxBackupHandle,
dev_id: u8, dev_id: u8,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -582,7 +620,8 @@ pub extern "C" fn proxmox_backup_close_image(
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_backup_close_image_async( proxmox_backup_close_image_async(
handle, dev_id, handle,
dev_id,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -605,10 +644,15 @@ pub extern "C" fn proxmox_backup_close_image_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let task = backup_handle_to_task(handle); let task = backup_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
task.runtime().spawn(async move { task.runtime().spawn(async move {
let result = task.close_image(dev_id).await; let result = task.close_image(dev_id).await;
@ -621,7 +665,7 @@ pub extern "C" fn proxmox_backup_close_image_async(
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_finish( pub extern "C" fn proxmox_backup_finish(
handle: *mut ProxmoxBackupHandle, handle: *mut ProxmoxBackupHandle,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -653,10 +697,15 @@ pub extern "C" fn proxmox_backup_finish_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let task = unsafe { & *(handle as * const Arc<BackupTask>) }; let task = unsafe { &*(handle as *const Arc<BackupTask>) };
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
task.runtime().spawn(async move { task.runtime().spawn(async move {
let result = task.finish().await; let result = task.finish().await;
@ -670,8 +719,7 @@ pub extern "C" fn proxmox_backup_finish_async(
#[no_mangle] #[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) { pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
let task = handle as *mut Arc<BackupTask>;
let task = handle as * mut Arc<BackupTask>;
unsafe { Box::from_raw(task) }; // take ownership, drop(task) unsafe { Box::from_raw(task) }; // take ownership, drop(task)
} }
@ -681,7 +729,7 @@ pub extern "C" fn proxmox_backup_disconnect(handle: *mut ProxmoxBackupHandle) {
// currently only implemented for images... // currently only implemented for images...
fn restore_handle_to_task(handle: *mut ProxmoxRestoreHandle) -> Arc<RestoreTask> { fn restore_handle_to_task(handle: *mut ProxmoxRestoreHandle) -> Arc<RestoreTask> {
let restore_task = unsafe { & *(handle as *const Arc<RestoreTask>) }; let restore_task = unsafe { &*(handle as *const Arc<RestoreTask>) };
// increase reference count while we use it inside rust // increase reference count while we use it inside rust
Arc::clone(restore_task) Arc::clone(restore_task)
} }
@ -696,9 +744,8 @@ pub extern "C" fn proxmox_restore_new(
keyfile: *const c_char, keyfile: *const c_char,
key_password: *const c_char, key_password: *const c_char,
fingerprint: *const c_char, fingerprint: *const c_char,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> *mut ProxmoxRestoreHandle { ) -> *mut ProxmoxRestoreHandle {
let result: Result<_, Error> = try_block!({ let result: Result<_, Error> = try_block!({
let repo: BackupRepository = tools::utf8_c_string(repo)? let repo: BackupRepository = tools::utf8_c_string(repo)?
.ok_or_else(|| format_err!("repo must not be NULL"))? .ok_or_else(|| format_err!("repo must not be NULL"))?
@ -740,7 +787,7 @@ pub extern "C" fn proxmox_restore_new(
match result { match result {
Ok(restore_task) => { Ok(restore_task) => {
let boxed_restore_task = Box::new(Arc::new(restore_task)); let boxed_restore_task = Box::new(Arc::new(restore_task));
Box::into_raw(boxed_restore_task) as * mut ProxmoxRestoreHandle Box::into_raw(boxed_restore_task) as *mut ProxmoxRestoreHandle
} }
Err(err) => raise_error_null!(error, err), Err(err) => raise_error_null!(error, err),
} }
@ -790,7 +837,12 @@ pub extern "C" fn proxmox_restore_connect_async(
error: *mut *mut c_char, error: *mut *mut c_char,
) { ) {
let restore_task = restore_handle_to_task(handle); let restore_task = restore_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
restore_task.runtime().spawn(async move { restore_task.runtime().spawn(async move {
let result = restore_task.connect().await; let result = restore_task.connect().await;
@ -804,8 +856,7 @@ pub extern "C" fn proxmox_restore_connect_async(
#[no_mangle] #[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_restore_disconnect(handle: *mut ProxmoxRestoreHandle) { pub extern "C" fn proxmox_restore_disconnect(handle: *mut ProxmoxRestoreHandle) {
let restore_task = handle as *mut Arc<RestoreTask>;
let restore_task = handle as * mut Arc<RestoreTask>;
let restore_task = unsafe { Box::from_raw(restore_task) }; let restore_task = unsafe { Box::from_raw(restore_task) };
drop(restore_task); drop(restore_task);
@ -823,14 +874,12 @@ pub extern "C" fn proxmox_restore_image(
archive_name: *const c_char, // expect full name here, i.e. "name.img.fidx" archive_name: *const c_char, // expect full name here, i.e. "name.img.fidx"
callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int, callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
callback_data: *mut c_void, callback_data: *mut c_void,
error: * mut * mut c_char, error: *mut *mut c_char,
verbose: bool, verbose: bool,
) -> c_int { ) -> c_int {
let restore_task = restore_handle_to_task(handle); let restore_task = restore_handle_to_task(handle);
let result: Result<_, Error> = try_block!({ let result: Result<_, Error> = try_block!({
let archive_name = tools::utf8_c_string(archive_name)? let archive_name = tools::utf8_c_string(archive_name)?
.ok_or_else(|| format_err!("archive_name must not be NULL"))?; .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
@ -838,13 +887,15 @@ pub extern "C" fn proxmox_restore_image(
callback(callback_data, offset, data.as_ptr(), data.len() as u64) callback(callback_data, offset, data.as_ptr(), data.len() as u64)
}; };
let write_zero_callback = move |offset: u64, len: u64| { let write_zero_callback =
callback(callback_data, offset, std::ptr::null(), len) move |offset: u64, len: u64| callback(callback_data, offset, std::ptr::null(), len);
};
proxmox_async::runtime::block_on( proxmox_async::runtime::block_on(restore_task.restore_image(
restore_task.restore_image(archive_name, write_data_callback, write_zero_callback, verbose) archive_name,
)?; write_data_callback,
write_zero_callback,
verbose,
))?;
Ok(()) Ok(())
}); });
@ -862,14 +913,15 @@ pub extern "C" fn proxmox_restore_image(
pub extern "C" fn proxmox_restore_open_image( pub extern "C" fn proxmox_restore_open_image(
handle: *mut ProxmoxRestoreHandle, handle: *mut ProxmoxRestoreHandle,
archive_name: *const c_char, archive_name: *const c_char,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
let mut got_result_condition = GotResultCondition::new(); let mut got_result_condition = GotResultCondition::new();
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_restore_open_image_async( proxmox_restore_open_image_async(
handle, archive_name, handle,
archive_name,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -890,10 +942,15 @@ pub extern "C" fn proxmox_restore_open_image_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let restore_task = restore_handle_to_task(handle); let restore_task = restore_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
param_not_null!(archive_name, callback_info); param_not_null!(archive_name, callback_info);
let archive_name = unsafe { tools::utf8_c_string_lossy_non_null(archive_name) }; let archive_name = unsafe { tools::utf8_c_string_lossy_non_null(archive_name) };
@ -901,7 +958,7 @@ pub extern "C" fn proxmox_restore_open_image_async(
restore_task.runtime().spawn(async move { restore_task.runtime().spawn(async move {
let result = match restore_task.open_image(archive_name).await { let result = match restore_task.open_image(archive_name).await {
Ok(res) => Ok(res as i32), Ok(res) => Ok(res as i32),
Err(err) => Err(err) Err(err) => Err(err),
}; };
callback_info.send_result(result); callback_info.send_result(result);
}); });
@ -913,7 +970,7 @@ pub extern "C" fn proxmox_restore_open_image_async(
pub extern "C" fn proxmox_restore_get_image_length( pub extern "C" fn proxmox_restore_get_image_length(
handle: *mut ProxmoxRestoreHandle, handle: *mut ProxmoxRestoreHandle,
aid: u8, aid: u8,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_long { ) -> c_long {
let restore_task = restore_handle_to_task(handle); let restore_task = restore_handle_to_task(handle);
let result = restore_task.get_image_length(aid); let result = restore_task.get_image_length(aid);
@ -940,7 +997,7 @@ pub extern "C" fn proxmox_restore_read_image_at(
data: *mut u8, data: *mut u8,
offset: u64, offset: u64,
size: u64, size: u64,
error: * mut * mut c_char, error: *mut *mut c_char,
) -> c_int { ) -> c_int {
let mut result: c_int = -1; let mut result: c_int = -1;
@ -949,7 +1006,11 @@ pub extern "C" fn proxmox_restore_read_image_at(
let callback_info = got_result_condition.callback_info(&mut result, error); let callback_info = got_result_condition.callback_info(&mut result, error);
proxmox_restore_read_image_at_async( proxmox_restore_read_image_at_async(
handle, aid, data, offset, size, handle,
aid,
data,
offset,
size,
callback_info.callback, callback_info.callback,
callback_info.callback_data, callback_info.callback_data,
callback_info.result, callback_info.result,
@ -983,11 +1044,16 @@ pub extern "C" fn proxmox_restore_read_image_at_async(
callback: extern "C" fn(*mut c_void), callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void, callback_data: *mut c_void,
result: *mut c_int, result: *mut c_int,
error: * mut * mut c_char, error: *mut *mut c_char,
) { ) {
let restore_task = restore_handle_to_task(handle); let restore_task = restore_handle_to_task(handle);
let callback_info = CallbackPointers { callback, callback_data, error, result }; let callback_info = CallbackPointers {
callback,
callback_data,
error,
result,
};
param_not_null!(data, callback_info); param_not_null!(data, callback_info);
let data = DataPointer(data); let data = DataPointer(data);
@ -1007,7 +1073,9 @@ pub extern "C" fn proxmox_restore_read_image_at_async(
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_export_state(buf_size: *mut usize) -> *mut u8 { pub extern "C" fn proxmox_export_state(buf_size: *mut usize) -> *mut u8 {
let data = commands::serialize_state().into_boxed_slice(); let data = commands::serialize_state().into_boxed_slice();
unsafe { *buf_size = data.len(); } unsafe {
*buf_size = data.len();
}
Box::leak(data).as_mut_ptr() Box::leak(data).as_mut_ptr()
} }
@ -1028,6 +1096,8 @@ pub extern "C" fn proxmox_import_state(buf: *const u8, buf_size: usize) {
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_free_state_buf(buf: *mut u8) { pub extern "C" fn proxmox_free_state_buf(buf: *mut u8) {
if !buf.is_null() { if !buf.is_null() {
unsafe { Box::from_raw(buf); } unsafe {
Box::from_raw(buf);
}
} }
} }

View File

@ -9,10 +9,11 @@ pub struct Registry<T> {
} }
impl<T> Registry<T> { impl<T> Registry<T> {
/// Create a new instance /// Create a new instance
pub fn new() -> Self { pub fn new() -> Self {
Self { info_list: Vec::new() } Self {
info_list: Vec::new(),
}
} }
/// Register data, returns associated ID /// Register data, returns associated ID

View File

@ -1,25 +1,25 @@
use std::sync::{Arc, Mutex};
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use anyhow::{format_err, bail, Error}; use anyhow::{bail, format_err, Error};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use proxmox_async::runtime::get_runtime_with_builder; use proxmox_async::runtime::get_runtime_with_builder;
use pbs_tools::crypt_config::CryptConfig; use pbs_client::{BackupReader, HttpClient, HttpClientOptions, RemoteChunkReader};
use pbs_config::key_config::load_and_decrypt_key; use pbs_config::key_config::load_and_decrypt_key;
use pbs_datastore::BackupManifest;
use pbs_datastore::index::IndexFile;
use pbs_datastore::cached_chunk_reader::CachedChunkReader; use pbs_datastore::cached_chunk_reader::CachedChunkReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::data_blob::DataChunkBuilder; use pbs_datastore::data_blob::DataChunkBuilder;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::ReadChunk; use pbs_datastore::read_chunk::ReadChunk;
use pbs_client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader}; use pbs_datastore::BackupManifest;
use pbs_tools::crypt_config::CryptConfig;
use super::BackupSetup; use super::BackupSetup;
use crate::registry::Registry;
use crate::capi_types::DataPointer; use crate::capi_types::DataPointer;
use crate::registry::Registry;
use crate::shared_cache::get_shared_chunk_cache; use crate::shared_cache::get_shared_chunk_cache;
struct ImageAccessInfo { struct ImageAccessInfo {
@ -38,21 +38,17 @@ pub(crate) struct RestoreTask {
} }
impl RestoreTask { impl RestoreTask {
/// Create a new instance, using the specified Runtime /// Create a new instance, using the specified Runtime
/// ///
/// We keep a reference to the runtime - else the runtime can be /// We keep a reference to the runtime - else the runtime can be
/// dropped and further connections fails. /// dropped and further connections fails.
pub fn with_runtime(setup: BackupSetup, runtime: Arc<Runtime>) -> Result<Self, Error> { pub fn with_runtime(setup: BackupSetup, runtime: Arc<Runtime>) -> Result<Self, Error> {
let crypt_config = match setup.keyfile { let crypt_config = match setup.keyfile {
None => None, None => None,
Some(ref path) => { Some(ref path) => {
let (key, _, _) = load_and_decrypt_key(path, & || { let (key, _, _) = load_and_decrypt_key(path, &|| match setup.key_password {
match setup.key_password { Some(ref key_password) => Ok(key_password.as_bytes().to_vec()),
Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), None => bail!("no key_password specified"),
None => bail!("no key_password specified"),
}
})?; })?;
Some(Arc::new(CryptConfig::new(key)?)) Some(Arc::new(CryptConfig::new(key)?))
} }
@ -81,13 +77,17 @@ impl RestoreTask {
} }
pub async fn connect(&self) -> Result<libc::c_int, Error> { pub async fn connect(&self) -> Result<libc::c_int, Error> {
let options = HttpClientOptions::new_non_interactive( let options = HttpClientOptions::new_non_interactive(
self.setup.password.clone(), self.setup.password.clone(),
self.setup.fingerprint.clone(), self.setup.fingerprint.clone(),
); );
let http = HttpClient::new(&self.setup.host, self.setup.port, &self.setup.auth_id, options)?; let http = HttpClient::new(
&self.setup.host,
self.setup.port,
&self.setup.auth_id,
options,
)?;
let client = BackupReader::start( let client = BackupReader::start(
http, http,
self.crypt_config.clone(), self.crypt_config.clone(),
@ -95,16 +95,19 @@ impl RestoreTask {
&self.setup.backup_type, &self.setup.backup_type,
&self.setup.backup_id, &self.setup.backup_id,
self.setup.backup_time, self.setup.backup_time,
true true,
).await?; )
.await?;
let (manifest, _) = client.download_manifest().await?; let (manifest, _) = client.download_manifest().await?;
manifest.check_fingerprint(self.crypt_config.as_ref().map(Arc::as_ref))?; manifest.check_fingerprint(self.crypt_config.as_ref().map(Arc::as_ref))?;
self.manifest.set(Arc::new(manifest)) self.manifest
.set(Arc::new(manifest))
.map_err(|_| format_err!("already connected!"))?; .map_err(|_| format_err!("already connected!"))?;
self.client.set(client) self.client
.set(client)
.map_err(|_| format_err!("already connected!"))?; .map_err(|_| format_err!("already connected!"))?;
Ok(0) Ok(0)
@ -121,7 +124,6 @@ impl RestoreTask {
write_zero_callback: impl Fn(u64, u64) -> i32, write_zero_callback: impl Fn(u64, u64) -> i32,
verbose: bool, verbose: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
if verbose { if verbose {
eprintln!("download and verify backup index"); eprintln!("download and verify backup index");
} }
@ -136,7 +138,9 @@ impl RestoreTask {
None => bail!("no manifest"), None => bail!("no manifest"),
}; };
let index = client.download_fixed_index(&manifest, &archive_name).await?; let index = client
.download_fixed_index(&manifest, &archive_name)
.await?;
let (_, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk( let (_, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk(
self.crypt_config.as_ref().map(Arc::as_ref), self.crypt_config.as_ref().map(Arc::as_ref),
@ -163,7 +167,7 @@ impl RestoreTask {
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
let digest = index.index_digest(pos).unwrap(); let digest = index.index_digest(pos).unwrap();
let offset = (pos*index.chunk_size) as u64; let offset = (pos * index.chunk_size) as u64;
if digest == &zero_chunk_digest { if digest == &zero_chunk_digest {
let res = write_zero_callback(offset, index.chunk_size as u64); let res = write_zero_callback(offset, index.chunk_size as u64);
if res < 0 { if res < 0 {
@ -180,12 +184,16 @@ impl RestoreTask {
bytes += raw_data.len(); bytes += raw_data.len();
} }
if verbose { if verbose {
let next_per = ((pos+1)*100)/index.index_count(); let next_per = ((pos + 1) * 100) / index.index_count();
if per != next_per { if per != next_per {
eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", eprintln!(
next_per, bytes, "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
zeroes*100/bytes, zeroes, next_per,
start_time.elapsed().as_secs()); bytes,
zeroes * 100 / bytes,
zeroes,
start_time.elapsed().as_secs()
);
per = next_per; per = next_per;
} }
} }
@ -193,10 +201,11 @@ impl RestoreTask {
let end_time = std::time::Instant::now(); let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time); let elapsed = end_time.duration_since(start_time);
eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", eprintln!(
bytes, "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
elapsed.as_secs_f64(), bytes,
bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) elapsed.as_secs_f64(),
bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
); );
Ok(()) Ok(())
@ -208,11 +217,7 @@ impl RestoreTask {
Ok(info.archive_size) Ok(info.archive_size)
} }
pub async fn open_image( pub async fn open_image(&self, archive_name: String) -> Result<u8, Error> {
&self,
archive_name: String,
) -> Result<u8, Error> {
let client = match self.client.get() { let client = match self.client.get() {
Some(reader) => Arc::clone(reader), Some(reader) => Arc::clone(reader),
None => bail!("not connected"), None => bail!("not connected"),
@ -223,7 +228,9 @@ impl RestoreTask {
None => bail!("no manifest"), None => bail!("no manifest"),
}; };
let index = client.download_fixed_index(&manifest, &archive_name).await?; let index = client
.download_fixed_index(&manifest, &archive_name)
.await?;
let archive_size = index.index_bytes(); let archive_size = index.index_bytes();
let most_used = index.find_most_used_chunks(8); let most_used = index.find_most_used_chunks(8);
@ -237,11 +244,16 @@ impl RestoreTask {
); );
let cache = get_shared_chunk_cache(); let cache = get_shared_chunk_cache();
let reader = Arc::new(CachedChunkReader::new_with_cache(chunk_reader, index, cache)); let reader = Arc::new(CachedChunkReader::new_with_cache(
chunk_reader,
index,
cache,
));
let info = ImageAccessInfo { let info = ImageAccessInfo {
archive_size, archive_size,
_archive_name: archive_name, /// useful to debug _archive_name: archive_name,
/// useful to debug
reader, reader,
}; };
@ -255,7 +267,6 @@ impl RestoreTask {
offset: u64, offset: u64,
size: u64, size: u64,
) -> Result<libc::c_int, Error> { ) -> Result<libc::c_int, Error> {
let (reader, image_size) = { let (reader, image_size) = {
let mut guard = self.image_registry.lock().unwrap(); let mut guard = self.image_registry.lock().unwrap();
let info = guard.lookup(aid)?; let info = guard.lookup(aid)?;

View File

@ -7,11 +7,7 @@ pub fn utf8_c_string(ptr: *const c_char) -> Result<Option<String>, Error> {
Ok(if ptr.is_null() { Ok(if ptr.is_null() {
None None
} else { } else {
Some(unsafe { Some(unsafe { CStr::from_ptr(ptr).to_str()?.to_owned() })
CStr::from_ptr(ptr)
.to_str()?
.to_owned()
})
}) })
} }

View File

@ -1,14 +1,14 @@
use anyhow::Error; use anyhow::Error;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Mutex, Arc}; use std::sync::{Arc, Mutex};
use futures::future::Future; use futures::future::Future;
use serde_json::json; use serde_json::json;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use pbs_datastore::index::IndexFile;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_client::*; use pbs_client::*;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
pub(crate) struct ChunkUploadInfo { pub(crate) struct ChunkUploadInfo {
pub digest: [u8; 32], pub digest: [u8; 32],
@ -23,14 +23,16 @@ pub(crate) struct UploadResult {
pub bytes_written: u64, pub bytes_written: u64,
} }
pub(crate) type UploadQueueSender = mpsc::Sender<Box<dyn Future<Output = Result<ChunkUploadInfo, Error>> + Send + Unpin>>; pub(crate) type UploadQueueSender =
type UploadQueueReceiver = mpsc::Receiver<Box<dyn Future<Output = Result<ChunkUploadInfo, Error>> + Send + Unpin>>; mpsc::Sender<Box<dyn Future<Output = Result<ChunkUploadInfo, Error>> + Send + Unpin>>;
type UploadQueueReceiver =
mpsc::Receiver<Box<dyn Future<Output = Result<ChunkUploadInfo, Error>> + Send + Unpin>>;
pub(crate) type UploadResultReceiver = oneshot::Receiver<Result<UploadResult, Error>>; pub(crate) type UploadResultReceiver = oneshot::Receiver<Result<UploadResult, Error>>;
type UploadResultSender = oneshot::Sender<Result<UploadResult, Error>>; type UploadResultSender = oneshot::Sender<Result<UploadResult, Error>>;
pub(crate) fn create_upload_queue( pub(crate) fn create_upload_queue(
client: Arc<BackupWriter>, client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
initial_index: Arc<Option<FixedIndexReader>>, initial_index: Arc<Option<FixedIndexReader>>,
wid: u64, wid: u64,
device_size: u64, device_size: u64,
@ -39,18 +41,16 @@ pub(crate) fn create_upload_queue(
let (upload_queue_tx, upload_queue_rx) = mpsc::channel(100); let (upload_queue_tx, upload_queue_rx) = mpsc::channel(100);
let (upload_result_tx, upload_result_rx) = oneshot::channel(); let (upload_result_tx, upload_result_rx) = oneshot::channel();
tokio::spawn( tokio::spawn(upload_handler(
upload_handler( client,
client, known_chunks,
known_chunks, initial_index,
initial_index, wid,
wid, device_size,
device_size, chunk_size,
chunk_size, upload_queue_rx,
upload_queue_rx, upload_result_tx,
upload_result_tx, ));
)
);
(upload_queue_tx, upload_result_rx) (upload_queue_tx, upload_result_rx)
} }
@ -67,7 +67,9 @@ async fn upload_chunk_list(
digest_list.truncate(0); digest_list.truncate(0);
offset_list.truncate(0); offset_list.truncate(0);
client.upload_put("fixed_index", None, "application/json", param_data).await?; client
.upload_put("fixed_index", None, "application/json", param_data)
.await?;
Ok(()) Ok(())
} }
@ -75,7 +77,7 @@ async fn upload_chunk_list(
#[allow(clippy::too_many_arguments, clippy::needless_range_loop)] #[allow(clippy::too_many_arguments, clippy::needless_range_loop)]
async fn upload_handler( async fn upload_handler(
client: Arc<BackupWriter>, client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
initial_index: Arc<Option<FixedIndexReader>>, initial_index: Arc<Option<FixedIndexReader>>,
wid: u64, wid: u64,
device_size: u64, device_size: u64,
@ -89,7 +91,7 @@ async fn upload_handler(
let mut digest_list = Vec::new(); let mut digest_list = Vec::new();
let mut offset_list = Vec::new(); let mut offset_list = Vec::new();
let index_size = ((device_size + chunk_size -1)/chunk_size) as usize; let index_size = ((device_size + chunk_size - 1) / chunk_size) as usize;
let mut index = Vec::with_capacity(index_size); let mut index = Vec::with_capacity(index_size);
index.resize(index_size, [0u8; 32]); index.resize(index_size, [0u8; 32]);
@ -103,17 +105,23 @@ async fn upload_handler(
while let Some(response_future) = upload_queue.recv().await { while let Some(response_future) = upload_queue.recv().await {
match response_future.await { match response_future.await {
Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => { Ok(ChunkUploadInfo {
digest,
offset,
size,
chunk_is_known,
}) => {
let digest_str = hex::encode(&digest); let digest_str = hex::encode(&digest);
//println!("upload_handler {:?} {}", digest, offset); //println!("upload_handler {:?} {}", digest, offset);
let pos = (offset/chunk_size) as usize; let pos = (offset / chunk_size) as usize;
index[pos] = digest; index[pos] = digest;
chunk_count += 1; chunk_count += 1;
bytes_written += size; bytes_written += size;
if !chunk_is_known { // register chunk as known if !chunk_is_known {
// register chunk as known
let mut known_chunks_guard = known_chunks.lock().unwrap(); let mut known_chunks_guard = known_chunks.lock().unwrap();
known_chunks_guard.insert(digest); known_chunks_guard.insert(digest);
} }
@ -127,12 +135,14 @@ async fn upload_handler(
wid, wid,
&mut digest_list, &mut digest_list,
&mut offset_list, &mut offset_list,
).await { )
.await
{
let _ = upload_result.send(Err(err)); let _ = upload_result.send(Err(err));
return; return;
} }
} }
} }
Err(err) => { Err(err) => {
let _ = upload_result.send(Err(err)); let _ = upload_result.send(Err(err));
return; return;
@ -141,12 +151,9 @@ async fn upload_handler(
} }
if !digest_list.is_empty() { if !digest_list.is_empty() {
if let Err(err) = upload_chunk_list( if let Err(err) =
Arc::clone(&client), upload_chunk_list(Arc::clone(&client), wid, &mut digest_list, &mut offset_list).await
wid, {
&mut digest_list,
&mut offset_list,
).await {
let _ = upload_result.send(Err(err)); let _ = upload_result.send(Err(err));
return; return;
} }
@ -158,5 +165,9 @@ async fn upload_handler(
} }
let csum = csum.finish(); let csum = csum.finish();
let _ = upload_result.send(Ok(UploadResult { csum, chunk_count, bytes_written })); let _ = upload_result.send(Ok(UploadResult {
csum,
chunk_count,
bytes_written,
}));
} }