diff --git a/src/worker_task.rs b/src/worker_task.rs index e13b58c..feb1153 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -142,16 +142,15 @@ fn backup_worker_task( BackupMessage::Connect { callback_info } => { let client = client.clone(); let setup = setup.clone(); - let abort_future = abort.listen(); - tokio::spawn(async move { - let command = async move { - let writer = setup.connect().await?; - let mut guard = client.lock().unwrap(); - *guard = Some(writer); - Ok(0) - }; - handle_async_command(command, abort_future, callback_info).await; - }); + + let command_future = async move { + let writer = setup.connect().await?; + let mut guard = client.lock().unwrap(); + *guard = Some(writer); + Ok(0) + }; + + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } BackupMessage::Abort => { println!("worker got abort mesage"); @@ -166,136 +165,94 @@ fn backup_worker_task( } BackupMessage::AddConfig { name, data, size, callback_info } => { let client = (*(client.lock().unwrap())).clone(); - let abort_future = abort.listen(); - let registry = registry.clone(); - let crypt_config = crypt_config.clone(); - tokio::spawn(async move { - match client { - Some(client) => { - handle_async_command( - add_config( - client, - crypt_config, - registry, - name, - data, - size, - ), - abort_future, - callback_info, - ).await; - } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } + match client { + Some(client) => { + let command_future = add_config( + client, + crypt_config.clone(), + registry.clone(), + name, + data, + size, + ); + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } - }); + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } + } } BackupMessage::RegisterImage { device_name, size, callback_info} => { let client = (*(client.lock().unwrap())).clone(); - let abort_future = abort.listen(); - let registry = registry.clone(); - let crypt_config = crypt_config.clone(); - let known_chunks = known_chunks.clone(); - tokio::spawn(async move { - match client { - Some(client) => { - handle_async_command( - register_image( - client, - crypt_config, - registry, - known_chunks, - device_name, - size, - chunk_size, - ), - abort_future, - callback_info, - ).await; - } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } + match client { + Some(client) => { + let command_future = register_image( + client, + crypt_config.clone(), + registry.clone(), + known_chunks.clone(), + device_name, + size, + chunk_size, + ); + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } - }); + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } + } } BackupMessage::CloseImage { dev_id, callback_info } => { let client = (*(client.lock().unwrap())).clone(); - let abort_future = abort.listen(); - let registry = registry.clone(); - tokio::spawn(async move { - match client { - Some(client) => { - handle_async_command( - close_image(client, registry, dev_id), - abort_future, - callback_info, - ).await; - } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } - }); + match client { + Some(client) => { + let command_future = close_image(client, registry.clone(), dev_id); + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } + } } BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => { let client = (*(client.lock().unwrap())).clone(); - let abort_future = abort.listen(); - let registry = registry.clone(); - let crypt_config = crypt_config.clone(); - let known_chunks = known_chunks.clone(); - let written_bytes2 = written_bytes2.clone(); - tokio::spawn(async move { - match client { - Some(client) => { - written_bytes2.fetch_add(size, Ordering::SeqCst); - handle_async_command( - write_data( - client, - crypt_config, - registry, - known_chunks, - dev_id, data, - offset, - size, - chunk_size, - ), - abort_future, - callback_info, - ).await; - } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } + match client { + Some(client) => { + written_bytes2.fetch_add(size, Ordering::SeqCst); + + let command_future = write_data( + client, + crypt_config.clone(), + registry.clone(), + known_chunks.clone(), + dev_id, data, + offset, + size, + chunk_size, + ); + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } - }); + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } + } } BackupMessage::Finish { callback_info } => { let client = (*(client.lock().unwrap())).clone(); - let abort_future = abort.listen(); - let registry = registry.clone(); - let crypt_config = crypt_config.clone(); - let setup = setup.clone(); - tokio::spawn(async move { - match client { - Some(client) => { - handle_async_command( - finish_backup( - client, - crypt_config, - registry, - setup, - ), - abort_future, - callback_info, - ).await; - } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } + match client { + Some(client) => { + let command_future = finish_backup( + client, + crypt_config.clone(), + registry.clone(), + setup.clone(), + ); + tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } - }); + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } + } } } }