add incremental backup support

Tell the server we're doing an incremental backup if QEMU notifies us
with "is_incremental" in register_image. We do this by using the
'reuse-csum' parameter when registering each archive's index, thus
switching the server to incremental mode (where it only expects changed
chunks and verifies the previous backup's checksum).

We use the newly changed API download_previous_fixed_index() to replace
known_chunks generation and also give us the verified index file to
calculate the new checksum with. The manifest for verfication is
downloaded during "Connect".

To initialize the session cache for checksums, lazy_static is used.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
Stefan Reiter 2020-06-25 12:23:32 +02:00 committed by Dietmar Maurer
parent 22c6bca881
commit cefb437a05
6 changed files with 111 additions and 22 deletions

View File

@ -17,8 +17,8 @@ cbindgen = "0.14.2"
[dependencies]
libc = "0.2"
bytes = "0.5"
proxmox = { version = "0.1.38", features = [ "sortable-macro", "api-macro" ] }
proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.3.0" }
proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] }
proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.4.0" }
#proxmox-backup = { path = "../proxmox-backup" }
chrono = "0.4" # Date and time library for Rust
anyhow = "1.0"
@ -27,4 +27,5 @@ serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
openssl = "0.10"
h2 = { version = "0.2", features = ["stream"] }
lazy_static = "1.4"

View File

@ -64,6 +64,7 @@ pub(crate) enum BackupMessage {
RegisterImage {
device_name: String,
size: u64,
incremental: bool,
callback_info: CallbackPointers,
},
CloseImage {

View File

@ -1,5 +1,5 @@
use anyhow::{bail, format_err, Error};
use std::collections::HashSet;
use std::collections::{HashSet, HashMap};
use std::sync::{Mutex, Arc};
use std::os::raw::c_int;
@ -13,6 +13,14 @@ use super::BackupSetup;
use crate::capi_types::*;
use crate::upload_queue::*;
use lazy_static::lazy_static;
lazy_static!{
static ref PREVIOUS_CSUMS: Mutex<HashMap<String, [u8;32]>> = {
Mutex::new(HashMap::new())
};
}
struct ImageUploadInfo {
wid: u64,
device_name: String,
@ -82,7 +90,6 @@ async fn register_zero_chunk(
pub(crate) async fn add_config(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<ImageRegistry>>,
name: String,
data: DataPointer,
@ -95,7 +102,7 @@ pub(crate) async fn add_config(
let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) };
let data = data.to_vec();
let stats = client.upload_blob_from_data(data, &blob_name, crypt_config, true, false).await?;
let stats = client.upload_blob_from_data(data, &blob_name, true, Some(false)).await?;
let mut guard = registry.lock().unwrap();
guard.file_list.push(json!({
@ -110,28 +117,75 @@ pub(crate) async fn add_config(
pub(crate) async fn register_image(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
manifest: Arc<Mutex<Option<Arc<BackupManifest>>>>,
registry: Arc<Mutex<ImageRegistry>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
device_name: String,
device_size: u64,
chunk_size: u64,
incremental: bool,
) -> Result<c_int, Error> {
//println!("register image {} size {}", device_name, device_size);
let archive_name = format!("{}.img.fidx", device_name);
client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?;
//println!("register image download chunk list OK");
let manifest = {
let guard = manifest.lock().unwrap();
match &*guard {
Some(manifest) => Some(manifest.clone()),
None => None
}
};
let index = match manifest {
Some(manifest) => {
Some(client.download_previous_fixed_index(&archive_name, &manifest, known_chunks.clone()).await?)
},
None => None
};
let mut param = json!({ "archive-name": archive_name , "size": device_size });
let mut initial_index = Arc::new(None);
if incremental {
let csum = {
let map = PREVIOUS_CSUMS.lock().unwrap();
match map.get(&device_name) {
Some(c) => Some(*c),
None => None
}
};
if let Some(csum) = csum {
param.as_object_mut().unwrap().insert("reuse-csum".to_owned(), json!(proxmox::tools::digest_to_hex(&csum)));
match index {
Some(index) => {
let index_size = ((device_size + chunk_size -1)/chunk_size) as usize;
if index_size != index.index_count() {
bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name);
}
if index.compute_csum().0 != csum {
bail!("previous backup checksum doesn't match session cache, incremental backup would be out of sync (drive: {})", archive_name);
}
initial_index = Arc::new(Some(index));
},
None => bail!("no previous backup found, cannot do incremental backup")
}
} else {
bail!("no previous backups in this session, cannot do incremental one");
}
}
let param = json!({ "archive-name": archive_name , "size": device_size});
let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap();
let zero_chunk_digest =
register_zero_chunk(client.clone(), crypt_config, chunk_size as usize, wid).await?;
let (upload_queue, upload_result) = create_upload_queue(
let (upload_queue, upload_result) = create_upload_queue(
client.clone(),
known_chunks.clone(),
initial_index.clone(),
wid,
device_size,
chunk_size,
@ -179,20 +233,30 @@ pub(crate) async fn close_image(
None => bail!("close_image: unknown error because upload result channel was already closed"),
};
let csum = proxmox::tools::digest_to_hex(&upload_result.csum);
let param = json!({
"wid": wid ,
"chunk-count": upload_result.chunk_count,
"size": upload_result.bytes_written,
"csum": proxmox::tools::digest_to_hex(&upload_result.csum),
"csum": csum.clone(),
});
let _value = client.post("fixed_close", Some(param)).await?;
{
let mut reg_guard = registry.lock().unwrap();
let info = reg_guard.lookup(dev_id)?;
let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap();
prev_csum_guard.insert(info.device_name.clone(), proxmox::tools::hex_to_digest(&csum).unwrap());
}
let mut guard = registry.lock().unwrap();
guard.file_list.push(json!({
"filename": format!("{}.img.fidx", device_name),
"size": device_size,
"csum": proxmox::tools::digest_to_hex(&upload_result.csum),
"csum": csum.clone(),
}));
Ok(0)
@ -315,7 +379,6 @@ pub(crate) async fn write_data(
pub(crate) async fn finish_backup(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<ImageRegistry>>,
setup: BackupSetup,
) -> Result<c_int, Error> {
@ -337,7 +400,7 @@ pub(crate) async fn finish_backup(
};
client
.upload_blob_from_data(index_data, "index.json.blob", crypt_config, true, true)
.upload_blob_from_data(index_data, "index.json.blob", true, Some(true))
.await?;
client.finish().await?;

View File

@ -285,6 +285,7 @@ pub extern "C" fn proxmox_backup_register_image(
handle: *mut ProxmoxBackupHandle,
device_name: *const c_char, // expect utf8 here
size: u64,
is_incremental: i32,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
@ -301,7 +302,7 @@ pub extern "C" fn proxmox_backup_register_image(
let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
if let Err(_) = task.command_tx.send(msg) {
raise_error_int!(error, format_err!("task already aborted (send command failed)"));
@ -325,6 +326,7 @@ pub extern "C" fn proxmox_backup_register_image_async(
handle: *mut ProxmoxBackupHandle,
device_name: *const c_char, // expect utf8 here
size: u64,
is_incremental: i32,
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
result: *mut c_int,
@ -341,7 +343,7 @@ pub extern "C" fn proxmox_backup_register_image_async(
let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
if let Err(_) = task.command_tx.send(msg) {
callback_info2.send_result(Err(format_err!("task already aborted")));

View File

@ -5,6 +5,7 @@ use std::sync::{Mutex, Arc};
use futures::future::Future;
use serde_json::json;
use tokio::sync::{mpsc, oneshot};
use proxmox_backup::backup::*;
use proxmox_backup::client::*;
pub(crate) struct ChunkUploadInfo {
@ -28,6 +29,7 @@ type UploadResultSender = oneshot::Sender<Result<UploadResult, Error>>;
pub(crate) fn create_upload_queue(
client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
initial_index: Arc<Option<FixedIndexReader>>,
wid: u64,
device_size: u64,
chunk_size: u64,
@ -39,6 +41,7 @@ pub(crate) fn create_upload_queue(
upload_handler(
client,
known_chunks,
initial_index,
wid,
device_size,
chunk_size,
@ -70,6 +73,7 @@ async fn upload_chunk_list(
async fn upload_handler(
client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
initial_index: Arc<Option<FixedIndexReader>>,
wid: u64,
device_size: u64,
chunk_size: u64,
@ -86,6 +90,14 @@ async fn upload_handler(
let mut index = Vec::with_capacity(index_size);
index.resize(index_size, [0u8; 32]);
// for incremental, initialize with data from previous backup
// caller ensures initial_index length is index_size
if let Some(init) = initial_index.as_ref() {
for i in 0..index_size {
index[i] = *init.index_digest(i).unwrap();
}
}
while let Some(response_future) = upload_queue.recv().await {
match response_future.await {
Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => {

View File

@ -120,6 +120,7 @@ fn backup_worker_task(
let written_bytes2 = written_bytes.clone();
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let manifest = Arc::new(Mutex::new(None));
let chunk_size = setup.chunk_size;
@ -145,6 +146,8 @@ fn backup_worker_task(
BackupMessage::Connect { callback_info } => {
let setup = setup.clone();
let client = client.clone();
let crypt_config = crypt_config.clone();
let manifest = manifest.clone();
let command_future = async move {
let options = HttpClientOptions::new()
@ -152,10 +155,17 @@ fn backup_worker_task(
.password(setup.password.clone());
let http = HttpClient::new(&setup.host, &setup.user, options)?;
let writer = BackupWriter::start(http, &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
let writer = BackupWriter::start(http, crypt_config.clone(), &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
let mut guard = client.lock().unwrap();
*guard = Some(writer);
let last_manifest = writer.download_previous_manifest().await;
let mut manifest_guard = manifest.lock().unwrap();
*manifest_guard = match last_manifest {
Ok(last_manifest) => Some(Arc::new(last_manifest)),
Err(_) => None
};
let mut client_guard = client.lock().unwrap();
*client_guard = Some(writer);
Ok(0)
};
@ -176,7 +186,6 @@ fn backup_worker_task(
Some(client) => {
let command_future = add_config(
client,
crypt_config.clone(),
registry.clone(),
name,
data,
@ -189,18 +198,20 @@ fn backup_worker_task(
}
}
}
BackupMessage::RegisterImage { device_name, size, callback_info} => {
BackupMessage::RegisterImage { device_name, size, incremental, callback_info } => {
let client = (*(client.lock().unwrap())).clone();
match client {
Some(client) => {
let command_future = register_image(
client,
crypt_config.clone(),
manifest.clone(),
registry.clone(),
known_chunks.clone(),
device_name,
size,
chunk_size,
incremental,
);
tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info));
}
@ -250,7 +261,6 @@ fn backup_worker_task(
Some(client) => {
let command_future = finish_backup(
client,
crypt_config.clone(),
registry.clone(),
setup.clone(),
);