diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 7e2fcc2e..7c4a3257 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -25,6 +25,7 @@ use xdg::BaseDirectories; use lazy_static::lazy_static; use futures::*; +use tokio::sync::mpsc; lazy_static! { static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap(); @@ -106,37 +107,34 @@ fn complete_repository(_arg: &str, _param: &HashMap) -> Vec>( - client: &mut HttpClient, - repo: &BackupRepository, + client: &BackupClient, dir_path: P, archive_name: &str, - backup_id: &str, - backup_time: DateTime, chunk_size: Option, all_file_systems: bool, verbose: bool, ) -> Result<(), Error> { - let mut param = json!({ - "archive-name": archive_name, - "backup-type": "host", - "backup-id": backup_id, - "backup-time": backup_time.timestamp(), - }); - - if let Some(size) = chunk_size { - param["chunk-size"] = size.into(); + if let Some(_size) = chunk_size { + unimplemented!(); } - let query = tools::json_object_to_query(param)?; + let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?; + let chunk_stream = ChunkStream::new(pxar_stream); - let path = format!("api2/json/admin/datastore/{}/pxar?{}", repo.store(), query); + let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks - let stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?; + let stream = rx + .map_err(Error::from) + .and_then(|x| x); // flatten - let body = Body::wrap_stream(stream); + // spawn chunker inside a separate task so that it can run parallel + tokio::spawn( + tx.send_all(chunk_stream.then(|r| Ok(r))) + .map_err(|e| {}).map(|_| ()) + ); - client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?; + client.upload_stream(archive_name, stream, "dynamic", None).wait()?; Ok(()) } @@ -440,20 +438,22 @@ fn create_backup( let backup_time = Local.timestamp(Local::now().timestamp(), 0); - let mut client = HttpClient::new(repo.host(), repo.user())?; - + let client = HttpClient::new(repo.host(), repo.user())?; record_repository(&repo); println!("Starting backup"); println!("Client name: {}", tools::nodename()); println!("Start Time: {}", backup_time.to_rfc3339()); + let client = client.start_backup(repo.store(), "host", &backup_id).wait()?; + for (filename, target) in upload_list { println!("Upload '{}' to '{:?}' as {}", filename, repo, target); - backup_directory(&mut client, &repo, &filename, &target, backup_id, backup_time, - chunk_size_opt, all_file_systems, verbose)?; + backup_directory(&client, &filename, &target, chunk_size_opt, all_file_systems, verbose)?; } + client.finish().wait()?; + let end_time = Local.timestamp(Local::now().timestamp(), 0); let elapsed = end_time.signed_duration_since(backup_time); println!("Duration: {}", elapsed);