use failure::*; use std::collections::HashSet; use std::thread::JoinHandle; use std::sync::{Mutex, Arc}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::{channel, Sender, Receiver}; use std::os::raw::c_int; use futures::future::{Future, Either, FutureExt}; use proxmox_backup::tools::BroadcastFuture; use proxmox_backup::backup::{CryptConfig, load_and_decrtypt_key}; use crate::capi_types::*; use crate::commands::*; pub(crate) struct BackupTask { pub worker: JoinHandle>, pub command_tx: Sender, pub aborted: Option, // set on abort, conatins abort reason } #[derive(Debug)] pub(crate) struct BackupTaskStats { written_bytes: u64, } impl BackupTask { pub fn new(setup: BackupSetup) -> Result { let crypt_config = match setup.keyfile { None => None, Some(ref path) => { let (key, _) = load_and_decrtypt_key(path, & || { match setup.key_password { Some(ref key_password) => Ok(key_password.as_bytes().to_vec()), None => bail!("no key_password specified"), } })?; Some(Arc::new(CryptConfig::new(key)?)) } }; let (connect_tx, connect_rx) = channel(); // sync initial server connect let (command_tx, command_rx) = channel(); let worker = std::thread::spawn(move || { backup_worker_task(setup, crypt_config, connect_tx, command_rx) }); let _worker_start_result = connect_rx.recv()??; Ok(BackupTask { worker, command_tx, aborted: None, }) } } fn handle_async_command>>( command_future: F, abort_future: impl 'static + Send + Future>, callback_info: CallbackPointers, ) -> impl Future { futures::future::select(command_future.boxed(), abort_future.boxed()) .map(move |either| { match either { Either::Left((result, _)) => { callback_info.send_result(result); } Either::Right(_) => { // aborted callback_info.send_result(Err(format_err!("worker aborted"))); } } }) } fn backup_worker_task( setup: BackupSetup, crypt_config: Option>, connect_tx: Sender>, command_rx: Receiver, ) -> Result { let mut builder = tokio::runtime::Builder::new(); builder.blocking_threads(1); builder.core_threads(4); builder.name_prefix("proxmox-backup-qemu-"); let runtime = match builder.build() { Ok(runtime) => runtime, Err(err) => { connect_tx.send(Err(format_err!("create runtime failed: {}", err)))?; bail!("create runtime failed"); } }; connect_tx.send(Ok(()))?; drop(connect_tx); // no longer needed let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1); let abort_rx = async move { match abort_rx.recv().await { Some(()) => Ok(()), None => bail!("abort future canceled"), } }; let abort = BroadcastFuture::new(Box::new(abort_rx)); let written_bytes = Arc::new(AtomicU64::new(0)); let written_bytes2 = written_bytes.clone(); let known_chunks = Arc::new(Mutex::new(HashSet::new())); let chunk_size = setup.chunk_size; let client = Arc::new(Mutex::new(None)); runtime.spawn(async move { let registry = Arc::new(Mutex::new(ImageRegistry::new())); loop { // Note: command_rx.recv() may block one thread, because there are // still enough threads to do the work let msg = command_rx.recv(); if let Err(_) = msg { // sender closed channel, try to abort and then end the loop let _ = abort_tx.send(()).await; break; }; let msg = msg.unwrap(); match msg { BackupMessage::Connect { callback_info } => { let client = client.clone(); let setup = setup.clone(); let abort_future = abort.listen(); tokio::spawn(async move { let command = async move { let writer = setup.connect().await?; let mut guard = client.lock().unwrap(); *guard = Some(writer); Ok(0) }; handle_async_command(command, abort_future, callback_info).await; }); } BackupMessage::Abort => { println!("worker got abort mesage"); let res = abort_tx.send(()).await; if let Err(_err) = res { println!("sending abort failed"); } } BackupMessage::End => { println!("worker got end mesage"); break; } BackupMessage::AddConfig { name, data, size, callback_info } => { let client = (*(client.lock().unwrap())).clone(); let abort_future = abort.listen(); let registry = registry.clone(); let crypt_config = crypt_config.clone(); tokio::spawn(async move { match client { Some(client) => { handle_async_command( add_config( client, crypt_config, registry, name, data, size, ), abort_future, callback_info, ).await; } None => { callback_info.send_result(Err(format_err!("not connected"))); } } }); } BackupMessage::RegisterImage { device_name, size, callback_info} => { let client = (*(client.lock().unwrap())).clone(); let abort_future = abort.listen(); let registry = registry.clone(); let crypt_config = crypt_config.clone(); let known_chunks = known_chunks.clone(); tokio::spawn(async move { match client { Some(client) => { handle_async_command( register_image( client, crypt_config, registry, known_chunks, device_name, size, chunk_size, ), abort_future, callback_info, ).await; } None => { callback_info.send_result(Err(format_err!("not connected"))); } } }); } BackupMessage::CloseImage { dev_id, callback_info } => { let client = (*(client.lock().unwrap())).clone(); let abort_future = abort.listen(); let registry = registry.clone(); tokio::spawn(async move { match client { Some(client) => { handle_async_command( close_image(client, registry, dev_id), abort_future, callback_info, ).await; } None => { callback_info.send_result(Err(format_err!("not connected"))); } } }); } BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => { let client = (*(client.lock().unwrap())).clone(); let abort_future = abort.listen(); let registry = registry.clone(); let crypt_config = crypt_config.clone(); let known_chunks = known_chunks.clone(); let written_bytes2 = written_bytes2.clone(); tokio::spawn(async move { match client { Some(client) => { written_bytes2.fetch_add(size, Ordering::SeqCst); handle_async_command( write_data( client, crypt_config, registry, known_chunks, dev_id, data, offset, size, chunk_size, ), abort_future, callback_info, ).await; } None => { callback_info.send_result(Err(format_err!("not connected"))); } } }); } BackupMessage::Finish { callback_info } => { let client = (*(client.lock().unwrap())).clone(); let abort_future = abort.listen(); let registry = registry.clone(); let crypt_config = crypt_config.clone(); let setup = setup.clone(); tokio::spawn(async move { match client { Some(client) => { handle_async_command( finish_backup( client, crypt_config, registry, setup, ), abort_future, callback_info, ).await; } None => { callback_info.send_result(Err(format_err!("not connected"))); } } }); } } } println!("worker end loop"); }); runtime.shutdown_on_idle(); let stats = BackupTaskStats { written_bytes: written_bytes.fetch_add(0, Ordering::SeqCst) }; Ok(stats) }