diff --git a/proxmox-async/src/blocking/std_channel_stream.rs b/proxmox-async/src/blocking/std_channel_stream.rs index cc27a4b3..83fcf493 100644 --- a/proxmox-async/src/blocking/std_channel_stream.rs +++ b/proxmox-async/src/blocking/std_channel_stream.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use std::task::{Context, Poll}; use std::sync::mpsc::Receiver; +use std::task::{Context, Poll}; use futures::stream::Stream; @@ -15,7 +15,7 @@ impl Stream for StdChannelStream { fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { match block_in_place(|| self.0.recv()) { Ok(data) => Poll::Ready(Some(data)), - Err(_) => Poll::Ready(None),// channel closed + Err(_) => Poll::Ready(None), // channel closed } } } diff --git a/proxmox-async/src/blocking/wrapped_reader_stream.rs b/proxmox-async/src/blocking/wrapped_reader_stream.rs index 426c0ef8..3a13f5f5 100644 --- a/proxmox-async/src/blocking/wrapped_reader_stream.rs +++ b/proxmox-async/src/blocking/wrapped_reader_stream.rs @@ -12,11 +12,12 @@ pub struct WrappedReaderStream { buffer: Vec, } -impl WrappedReaderStream { - +impl WrappedReaderStream { pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } + let mut buffer = Vec::with_capacity(64 * 1024); + unsafe { + buffer.set_len(buffer.capacity()); + } Self { reader, buffer } } } @@ -49,9 +50,7 @@ mod test { #[test] fn test_wrapped_stream_reader() -> Result<(), Error> { - crate::runtime::main(async { - run_wrapped_stream_reader_test().await - }) + crate::runtime::main(async { run_wrapped_stream_reader_test().await }) } struct DummyReader(usize); diff --git a/proxmox-async/src/broadcast_future.rs b/proxmox-async/src/broadcast_future.rs index 6046fd6c..838204d3 100644 --- a/proxmox-async/src/broadcast_future.rs +++ b/proxmox-async/src/broadcast_future.rs @@ -15,8 +15,7 @@ pub struct BroadcastData { listeners: Vec>>, } -impl BroadcastData { - +impl BroadcastData { pub fn new() -> Self { Self { result: None, @@ -25,16 +24,19 @@ impl BroadcastData { } pub fn notify_listeners(&mut self, result: Result) { - self.result = Some(result.clone()); loop { match self.listeners.pop() { - None => { break; }, - Some(ch) => { - match &result { - Ok(result) => { let _ = ch.send(Ok(result.clone())); }, - Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, + None => { + break; + } + Some(ch) => match &result { + Ok(result) => { + let _ = ch.send(Ok(result.clone())); + } + Err(err) => { + let _ = ch.send(Err(format_err!("{}", err))); } }, } @@ -45,7 +47,7 @@ impl BroadcastData { use futures::future::{ok, Either}; match &self.result { - None => {}, + None => {} Some(Ok(result)) => return Either::Left(ok(result.clone())), Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))), } @@ -54,13 +56,11 @@ impl BroadcastData { self.listeners.push(tx); - Either::Right(rx - .map(|res| match res { - Ok(Ok(t)) => Ok(t), - Ok(Err(e)) => Err(e), - Err(e) => Err(Error::from(e)), - }) - ) + Either::Right(rx.map(|res| match res { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(e), + Err(e) => Err(Error::from(e)), + })) } } @@ -85,40 +85,35 @@ impl BroadcastFuture { broadcast: BroadcastData::new(), future: Some(Pin::from(source)), }; - Self { inner: Arc::new(Mutex::new(inner)) } + Self { + inner: Arc::new(Mutex::new(inner)), + } } /// Creates a new instance with a oneshot channel as trigger pub fn new_oneshot() -> (Self, oneshot::Sender>) { - let (tx, rx) = oneshot::channel::>(); - let rx = rx - .map_err(Error::from) - .and_then(futures::future::ready); + let rx = rx.map_err(Error::from).and_then(futures::future::ready); (Self::new(Box::new(rx)), tx) } - fn notify_listeners( - inner: Arc>>, - result: Result, - ) { + fn notify_listeners(inner: Arc>>, result: Result) { let mut data = inner.lock().unwrap(); data.broadcast.notify_listeners(result); } - fn spawn(inner: Arc>>) -> impl Future> { + fn spawn( + inner: Arc>>, + ) -> impl Future> { let mut data = inner.lock().unwrap(); if let Some(source) = data.future.take() { - let inner1 = inner.clone(); - let task = source.map(move |value| { - match value { - Ok(value) => Self::notify_listeners(inner1, Ok(value)), - Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), - } + let task = source.map(move |value| match value { + Ok(value) => Self::notify_listeners(inner1, Ok(value)), + Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), }); tokio::spawn(task); } @@ -137,22 +132,28 @@ impl BroadcastFuture { fn test_broadcast_future() { use std::sync::atomic::{AtomicUsize, Ordering}; - static CHECKSUM: AtomicUsize = AtomicUsize::new(0); + static CHECKSUM: AtomicUsize = AtomicUsize::new(0); let (sender, trigger) = BroadcastFuture::new_oneshot(); - let receiver1 = sender.listen() + let receiver1 = sender + .listen() .map_ok(|res| { CHECKSUM.fetch_add(res, Ordering::SeqCst); }) - .map_err(|err| { panic!("got error {}", err); }) + .map_err(|err| { + panic!("got error {}", err); + }) .map(|_| ()); - let receiver2 = sender.listen() + let receiver2 = sender + .listen() .map_ok(|res| { - CHECKSUM.fetch_add(res*2, Ordering::SeqCst); + CHECKSUM.fetch_add(res * 2, Ordering::SeqCst); + }) + .map_err(|err| { + panic!("got error {}", err); }) - .map_err(|err| { panic!("got error {}", err); }) .map(|_| ()); let rt = tokio::runtime::Runtime::new().unwrap(); @@ -170,12 +171,17 @@ fn test_broadcast_future() { assert_eq!(result, 3); // the result stays available until the BroadcastFuture is dropped - rt.block_on(sender.listen() - .map_ok(|res| { - CHECKSUM.fetch_add(res*4, Ordering::SeqCst); - }) - .map_err(|err| { panic!("got error {}", err); }) - .map(|_| ())); + rt.block_on( + sender + .listen() + .map_ok(|res| { + CHECKSUM.fetch_add(res * 4, Ordering::SeqCst); + }) + .map_err(|err| { + panic!("got error {}", err); + }) + .map(|_| ()), + ); let result = CHECKSUM.load(Ordering::SeqCst); assert_eq!(result, 7); diff --git a/proxmox-async/src/compression.rs b/proxmox-async/src/compression.rs index 68b73d59..b36f2916 100644 --- a/proxmox-async/src/compression.rs +++ b/proxmox-async/src/compression.rs @@ -9,8 +9,8 @@ use futures::ready; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use proxmox_sys::io_format_err; use proxmox_io::ByteBuffer; +use proxmox_sys::io_format_err; const BUFFER_SIZE: usize = 8192; @@ -98,7 +98,8 @@ impl DeflateEncoder> { let mut buffer = Vec::with_capacity(size_hint); reader.read_to_end(&mut buffer).await?; self.inner.reserve(size_hint); // should be enough since we want smalller files - self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; + self.compressor + .compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; Ok(()) } } @@ -144,7 +145,7 @@ impl DeflateEncoder { impl Stream for DeflateEncoder where T: Stream> + Unpin, - O: Into + O: Into, { type Item = Result; diff --git a/proxmox-async/src/io/async_channel_writer.rs b/proxmox-async/src/io/async_channel_writer.rs index f483d8c3..f63648a5 100644 --- a/proxmox-async/src/io/async_channel_writer.rs +++ b/proxmox-async/src/io/async_channel_writer.rs @@ -1,19 +1,19 @@ //! Wrappers between async readers and streams. -use std::io; use std::future::Future; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::{Error, Result}; -use tokio::io::{AsyncWrite}; -use tokio::sync::mpsc::Sender; -use futures::ready; use futures::future::FutureExt; +use futures::ready; +use tokio::io::AsyncWrite; +use tokio::sync::mpsc::Sender; -use proxmox_sys::io_format_err; -use proxmox_sys::error::io_err_other; use proxmox_io::ByteBuffer; +use proxmox_sys::error::io_err_other; +use proxmox_sys::io_format_err; /// Wrapper around tokio::sync::mpsc::Sender, which implements Write pub struct AsyncChannelWriter { diff --git a/proxmox-async/src/stream/async_reader_stream.rs b/proxmox-async/src/stream/async_reader_stream.rs index 5f94dce6..caa6736a 100644 --- a/proxmox-async/src/stream/async_reader_stream.rs +++ b/proxmox-async/src/stream/async_reader_stream.rs @@ -4,9 +4,9 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, ReadBuf}; use futures::ready; use futures::stream::Stream; +use tokio::io::{AsyncRead, ReadBuf}; /// Wrapper struct to convert an [AsyncRead] into a [Stream] pub struct AsyncReaderStream { @@ -14,17 +14,20 @@ pub struct AsyncReaderStream { buffer: Vec, } -impl AsyncReaderStream { - +impl AsyncReaderStream { pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } + let mut buffer = Vec::with_capacity(64 * 1024); + unsafe { + buffer.set_len(buffer.capacity()); + } Self { reader, buffer } } pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { let mut buffer = Vec::with_capacity(buffer_size); - unsafe { buffer.set_len(buffer.capacity()); } + unsafe { + buffer.set_len(buffer.capacity()); + } Self { reader, buffer } } } diff --git a/proxmox-async/src/zip.rs b/proxmox-async/src/zip.rs index d1df7139..04bd4e0a 100644 --- a/proxmox-async/src/zip.rs +++ b/proxmox-async/src/zip.rs @@ -625,8 +625,8 @@ pub async fn zip_directory(target: W, source: &Path) -> Result<(), Error> where W: AsyncWrite + Unpin + Send, { - use walkdir::WalkDir; use std::os::unix::fs::MetadataExt; + use walkdir::WalkDir; let base_path = source.parent().unwrap_or_else(|| Path::new("/")); let mut encoder = ZipEncoder::new(target); @@ -640,28 +640,22 @@ where if let Err(err) = async move { let entry_path_no_base = entry.path().strip_prefix(base_path)?; let metadata = entry.metadata()?; - let mtime = match metadata.modified().unwrap_or_else(|_| SystemTime::now()).duration_since(SystemTime::UNIX_EPOCH) { + let mtime = match metadata + .modified() + .unwrap_or_else(|_| SystemTime::now()) + .duration_since(SystemTime::UNIX_EPOCH) + { Ok(dur) => dur.as_secs() as i64, - Err(time_error) => -(time_error.duration().as_secs() as i64) + Err(time_error) => -(time_error.duration().as_secs() as i64), }; let mode = metadata.mode() as u16; if entry.file_type().is_file() { let file = tokio::fs::File::open(entry.path()).await?; - let ze = ZipEntry::new( - &entry_path_no_base, - mtime, - mode, - true, - ); + let ze = ZipEntry::new(&entry_path_no_base, mtime, mode, true); encoder.add_entry(ze, Some(file)).await?; } else if entry.file_type().is_dir() { - let ze = ZipEntry::new( - &entry_path_no_base, - mtime, - mode, - false, - ); + let ze = ZipEntry::new(&entry_path_no_base, mtime, mode, false); let content: Option = None; encoder.add_entry(ze, content).await?; }