From 762200557401c274631c44137468f42c39f82f7b Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 13:44:13 +0200 Subject: [PATCH] src/api2/backup/upload_chunk.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/api2/backup/upload_chunk.rs | 107 +++++++++++++++++--------------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index b72d8de2..1361fc19 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -1,16 +1,18 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + use failure::*; use futures::*; -use std::sync::Arc; - -use hyper::http::request::Parts; use hyper::Body; +use hyper::http::request::Parts; use serde_json::{json, Value}; -use crate::tools; -use crate::backup::*; +use crate::api2::types::*; use crate::api_schema::*; use crate::api_schema::router::*; -use crate::api2::types::*; +use crate::backup::*; +use crate::tools; use super::environment::*; @@ -24,51 +26,58 @@ pub struct UploadChunk { } impl UploadChunk { - pub fn new(stream: Body, store: Arc, digest: [u8; 32], size: u32, encoded_size: u32) -> Self { Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest } } } impl Future for UploadChunk { - type Item = ([u8; 32], u32, u32, bool); - type Error = failure::Error; + type Output = Result<([u8; 32], u32, u32, bool), Error>; - fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> { - loop { - match try_ready!(self.stream.poll()) { - Some(input) => { - if let Some(ref mut raw_data) = self.raw_data { - if (raw_data.len() + input.len()) > (self.encoded_size as usize) { - bail!("uploaded chunk is larger than announced."); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + let err: Error = loop { + match ready!(Pin::new(&mut this.stream).poll_next(cx)) { + Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), + Some(Ok(input)) => { + if let Some(ref mut raw_data) = this.raw_data { + if (raw_data.len() + input.len()) > (this.encoded_size as usize) { + break format_err!("uploaded chunk is larger than announced."); } raw_data.extend_from_slice(&input); } else { - bail!("poll upload chunk stream failed - already finished."); + break format_err!("poll upload chunk stream failed - already finished."); } } None => { - if let Some(raw_data) = self.raw_data.take() { - if raw_data.len() != (self.encoded_size as usize) { - bail!("uploaded chunk has unexpected size."); + if let Some(raw_data) = this.raw_data.take() { + if raw_data.len() != (this.encoded_size as usize) { + break format_err!("uploaded chunk has unexpected size."); } - let mut chunk = DataChunk::from_raw(raw_data, self.digest)?; + let (is_duplicate, compressed_size) = match proxmox::tools::try_block! { + let mut chunk = DataChunk::from_raw(raw_data, this.digest)?; - chunk.verify_unencrypted(self.size as usize)?; + chunk.verify_unencrypted(this.size as usize)?; - // always comput CRC at server side - chunk.set_crc(chunk.compute_crc()); + // always comput CRC at server side + chunk.set_crc(chunk.compute_crc()); - let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk)?; + this.store.insert_chunk(&chunk) + } { + Ok(res) => res, + Err(err) => break err, + }; - return Ok(Async::Ready((self.digest, self.size, compressed_size as u32, is_duplicate))) + return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate))) } else { - bail!("poll upload chunk stream failed - already finished."); + break format_err!("poll upload chunk stream failed - already finished."); } } } - } + }; + Poll::Ready(Err(err)) } } @@ -115,14 +124,14 @@ fn upload_fixed_chunk( .then(move |result| { let env: &BackupEnvironment = rpcenv.as_ref(); - let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { - env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; - let digest_str = proxmox::tools::digest_to_hex(&digest); - env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - Ok(json!(digest_str)) - }); + let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { + env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; + let digest_str = proxmox::tools::digest_to_hex(&digest); + env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); + Ok(json!(digest_str)) + }); - Ok(env.format_response(result)) + future::ok(env.format_response(result)) }); Ok(Box::new(resp)) @@ -171,14 +180,14 @@ fn upload_dynamic_chunk( .then(move |result| { let env: &BackupEnvironment = rpcenv.as_ref(); - let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { - env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; - let digest_str = proxmox::tools::digest_to_hex(&digest); - env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - Ok(json!(digest_str)) - }); + let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { + env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; + let digest_str = proxmox::tools::digest_to_hex(&digest); + env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); + Ok(json!(digest_str)) + }); - Ok(env.format_response(result)) + future::ok(env.format_response(result)) }); Ok(Box::new(resp)) @@ -201,10 +210,10 @@ fn upload_speedtest( let resp = req_body .map_err(Error::from) - .fold(0, |size: usize, chunk| -> Result { + .try_fold(0, |size: usize, chunk| { let sum = size + chunk.len(); //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); - Ok(sum) + future::ok::(sum) }) .then(move |result| { match result { @@ -216,7 +225,7 @@ fn upload_speedtest( } } let env: &BackupEnvironment = rpcenv.as_ref(); - Ok(env.format_response(Ok(Value::Null))) + future::ok(env.format_response(Ok(Value::Null))) }); Ok(Box::new(resp)) @@ -257,11 +266,11 @@ fn upload_blob( let resp = req_body .map_err(Error::from) - .fold(Vec::new(), |mut acc, chunk| { + .try_fold(Vec::new(), |mut acc, chunk| { acc.extend_from_slice(&*chunk); - Ok::<_, Error>(acc) + future::ok::<_, Error>(acc) }) - .and_then(move |data| { + .and_then(move |data| async move { if encoded_size != data.len() { bail!("got blob with unexpected length ({} != {})", encoded_size, data.len()); } @@ -271,7 +280,7 @@ fn upload_blob( Ok(()) }) .and_then(move |_| { - Ok(env3.format_response(Ok(Value::Null))) + future::ok(env3.format_response(Ok(Value::Null))) }) ;