From ca246b4590ccc449afe082e7d06898f2767b3a06 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 25 Oct 2019 11:57:33 +0200 Subject: [PATCH] run all commands async with tokio::spawn() --- src/worker_task.rs | 240 +++++++++++++++++++++++++++------------------ 1 file changed, 142 insertions(+), 98 deletions(-) diff --git a/src/worker_task.rs b/src/worker_task.rs index bb96fae..e13b58c 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -103,8 +103,6 @@ fn backup_worker_task( connect_tx.send(Ok(()))?; drop(connect_tx); // no longer needed - let mut client = None; - let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1); let abort_rx = async move { match abort_rx.recv().await { @@ -122,25 +120,38 @@ fn backup_worker_task( let chunk_size = setup.chunk_size; + let client = Arc::new(Mutex::new(None)); + runtime.spawn(async move { let registry = Arc::new(Mutex::new(ImageRegistry::new())); loop { - let msg = command_rx.recv().unwrap(); // todo: should be blocking + // Note: command_rx.recv() may block one thread, because there are + // still enough threads to do the work + let msg = command_rx.recv(); + if let Err(_) = msg { + // sender closed channel, try to abort and then end the loop + let _ = abort_tx.send(()).await; + break; + }; + + let msg = msg.unwrap(); match msg { BackupMessage::Connect { callback_info } => { - client = match setup.connect().await { - Ok(client) => { - callback_info.send_result(Ok(0)); - Some(client) - } - Err(err) => { - callback_info.send_result(Err(err)); - None - } - }; + let client = client.clone(); + let setup = setup.clone(); + let abort_future = abort.listen(); + tokio::spawn(async move { + let command = async move { + let writer = setup.connect().await?; + let mut guard = client.lock().unwrap(); + *guard = Some(writer); + Ok(0) + }; + handle_async_command(command, abort_future, callback_info).await; + }); } BackupMessage::Abort => { println!("worker got abort mesage"); @@ -154,104 +165,137 @@ fn backup_worker_task( break; } BackupMessage::AddConfig { name, data, size, callback_info } => { - match client { - Some(ref client) => { - handle_async_command( - add_config( - client.clone(), - crypt_config.clone(), - registry.clone(), - name, - data, - size, - ), - abort.listen(), - callback_info, - ).await; + let client = (*(client.lock().unwrap())).clone(); + let abort_future = abort.listen(); + let registry = registry.clone(); + let crypt_config = crypt_config.clone(); + tokio::spawn(async move { + match client { + Some(client) => { + handle_async_command( + add_config( + client, + crypt_config, + registry, + name, + data, + size, + ), + abort_future, + callback_info, + ).await; + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } + }); } BackupMessage::RegisterImage { device_name, size, callback_info} => { - match client { - Some(ref client) => { - handle_async_command( - register_image( - client.clone(), - crypt_config.clone(), - registry.clone(), - known_chunks.clone(), - device_name, - size, - chunk_size, - ), - abort.listen(), - callback_info, - ).await; + let client = (*(client.lock().unwrap())).clone(); + let abort_future = abort.listen(); + let registry = registry.clone(); + let crypt_config = crypt_config.clone(); + let known_chunks = known_chunks.clone(); + tokio::spawn(async move { + match client { + Some(client) => { + handle_async_command( + register_image( + client, + crypt_config, + registry, + known_chunks, + device_name, + size, + chunk_size, + ), + abort_future, + callback_info, + ).await; + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } + }); } BackupMessage::CloseImage { dev_id, callback_info } => { - match client { - Some(ref client) => { - handle_async_command( - close_image(client.clone(), registry.clone(), dev_id), - abort.listen(), - callback_info, - ).await; + let client = (*(client.lock().unwrap())).clone(); + let abort_future = abort.listen(); + let registry = registry.clone(); + tokio::spawn(async move { + match client { + Some(client) => { + handle_async_command( + close_image(client, registry, dev_id), + abort_future, + callback_info, + ).await; + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } + }); } BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => { - match client { - Some(ref client) => { - written_bytes2.fetch_add(size, Ordering::SeqCst); - handle_async_command( - write_data( - client.clone(), - crypt_config.clone(), - registry.clone(), - known_chunks.clone(), - dev_id, data, - offset, - size, - chunk_size, - ), - abort.listen(), - callback_info, - ).await; + let client = (*(client.lock().unwrap())).clone(); + let abort_future = abort.listen(); + let registry = registry.clone(); + let crypt_config = crypt_config.clone(); + let known_chunks = known_chunks.clone(); + let written_bytes2 = written_bytes2.clone(); + tokio::spawn(async move { + match client { + Some(client) => { + written_bytes2.fetch_add(size, Ordering::SeqCst); + handle_async_command( + write_data( + client, + crypt_config, + registry, + known_chunks, + dev_id, data, + offset, + size, + chunk_size, + ), + abort_future, + callback_info, + ).await; + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } + }); } BackupMessage::Finish { callback_info } => { - match client { - Some(ref client) => { - handle_async_command( - finish_backup( - client.clone(), - crypt_config.clone(), - registry.clone(), - setup.clone(), - ), - abort.listen(), - callback_info, - ).await; + let client = (*(client.lock().unwrap())).clone(); + let abort_future = abort.listen(); + let registry = registry.clone(); + let crypt_config = crypt_config.clone(); + let setup = setup.clone(); + tokio::spawn(async move { + match client { + Some(client) => { + handle_async_command( + finish_backup( + client, + crypt_config, + registry, + setup, + ), + abort_future, + callback_info, + ).await; + } + None => { + callback_info.send_result(Err(format_err!("not connected"))); + } } - None => { - callback_info.send_result(Err(format_err!("not connected"))); - } - } + }); } } }