From 0eeb0dd17ccf0886c0cb683186f334cc1c591a29 Mon Sep 17 00:00:00 2001 From: Thomas Lamprecht Date: Sun, 10 Apr 2022 12:41:21 +0200 Subject: [PATCH] http: rustfmt Signed-off-by: Thomas Lamprecht --- proxmox-http/src/client/connector.rs | 22 ++----- .../src/client/rate_limited_stream.rs | 62 +++++++++---------- proxmox-http/src/client/rate_limiter.rs | 46 +++++++++----- 3 files changed, 64 insertions(+), 66 deletions(-) diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs index e01eea42..ddf77410 100644 --- a/proxmox-http/src/client/connector.rs +++ b/proxmox-http/src/client/connector.rs @@ -148,7 +148,6 @@ impl hyper::service::Service for HttpsConnector { let read_limiter = self.read_limiter.clone(); let write_limiter = self.write_limiter.clone(); - if let Some(ref proxy) = self.proxy { let use_connect = is_https || proxy.force_connect; @@ -177,11 +176,8 @@ impl hyper::service::Service for HttpsConnector { let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - let mut tcp_stream = RateLimitedStream::with_limiter( - tcp_stream, - read_limiter, - write_limiter, - ); + let mut tcp_stream = + RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); if let Some(authorization) = authorization { @@ -210,11 +206,8 @@ impl hyper::service::Service for HttpsConnector { let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - let tcp_stream = RateLimitedStream::with_limiter( - tcp_stream, - read_limiter, - write_limiter, - ); + let tcp_stream = + RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); Ok(MaybeTlsStream::Proxied(tcp_stream)) } @@ -230,11 +223,8 @@ impl hyper::service::Service for HttpsConnector { let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - let tcp_stream = RateLimitedStream::with_limiter( - tcp_stream, - read_limiter, - write_limiter, - ); + let tcp_stream = + RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); if is_https { Self::secure_stream(tcp_stream, &ssl_connector, &host).await diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs index 18355f5f..8fd07a34 100644 --- a/proxmox-http/src/client/rate_limited_stream.rs +++ b/proxmox-http/src/client/rate_limited_stream.rs @@ -1,17 +1,17 @@ -use std::pin::Pin; +use std::io::IoSlice; use std::marker::Unpin; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use std::io::IoSlice; use futures::Future; -use tokio::io::{ReadBuf, AsyncRead, AsyncWrite}; +use hyper::client::connect::{Connected, Connection}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::time::Sleep; -use hyper::client::connect::{Connection, Connected}; use std::task::{Context, Poll}; -use super::{ShareableRateLimit, RateLimiter}; +use super::{RateLimiter, ShareableRateLimit}; type SharedRateLimit = Arc; @@ -21,7 +21,8 @@ pub struct RateLimitedStream { read_delay: Option>>, write_limiter: Option, write_delay: Option>>, - update_limiter_cb: Option (Option, Option) + Send>>, + update_limiter_cb: + Option (Option, Option) + Send>>, last_limiter_update: Instant, stream: S, } @@ -32,8 +33,7 @@ impl RateLimitedStream { } } -impl RateLimitedStream { - +impl RateLimitedStream { /// Creates a new instance with reads and writes limited to the same `rate`. pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self { let now = Instant::now(); @@ -50,7 +50,7 @@ impl RateLimitedStream { read_limiter: Option, write_limiter: Option, ) -> Self { - Self { + Self { read_limiter, read_delay: None, write_limiter, @@ -67,7 +67,9 @@ impl RateLimitedStream { /// /// Note: This function is called within an async context, so it /// should be fast and must not block. - pub fn with_limiter_update_cb (Option, Option) + Send + 'static>( + pub fn with_limiter_update_cb< + F: Fn() -> (Option, Option) + Send + 'static, + >( stream: S, update_limiter_cb: F, ) -> Self { @@ -95,11 +97,7 @@ impl RateLimitedStream { } } -fn register_traffic( - limiter: &(dyn ShareableRateLimit), - count: usize, -) -> Option>>{ - +fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option>> { const MIN_DELAY: Duration = Duration::from_millis(10); let now = Instant::now(); @@ -114,25 +112,24 @@ fn register_traffic( fn delay_is_ready(delay: &mut Option>>, ctx: &mut Context<'_>) -> bool { match delay { - Some(ref mut future) => { - future.as_mut().poll(ctx).is_ready() - } + Some(ref mut future) => future.as_mut().poll(ctx).is_ready(), None => true, } } -impl AsyncWrite for RateLimitedStream { - +impl AsyncWrite for RateLimitedStream { fn poll_write( self: Pin<&mut Self>, ctx: &mut Context<'_>, - buf: &[u8] + buf: &[u8], ) -> Poll> { let this = self.get_mut(); let is_ready = delay_is_ready(&mut this.write_delay, ctx); - if !is_ready { return Poll::Pending; } + if !is_ready { + return Poll::Pending; + } this.write_delay = None; @@ -156,13 +153,15 @@ impl AsyncWrite for RateLimitedStream { fn poll_write_vectored( self: Pin<&mut Self>, ctx: &mut Context<'_>, - bufs: &[IoSlice<'_>] + bufs: &[IoSlice<'_>], ) -> Poll> { let this = self.get_mut(); let is_ready = delay_is_ready(&mut this.write_delay, ctx); - if !is_ready { return Poll::Pending; } + if !is_ready { + return Poll::Pending; + } this.write_delay = None; @@ -179,25 +178,21 @@ impl AsyncWrite for RateLimitedStream { result } - fn poll_flush( - self: Pin<&mut Self>, - ctx: &mut Context<'_> - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); Pin::new(&mut this.stream).poll_flush(ctx) } fn poll_shutdown( self: Pin<&mut Self>, - ctx: &mut Context<'_> + ctx: &mut Context<'_>, ) -> Poll> { let this = self.get_mut(); Pin::new(&mut this.stream).poll_shutdown(ctx) } } -impl AsyncRead for RateLimitedStream { - +impl AsyncRead for RateLimitedStream { fn poll_read( self: Pin<&mut Self>, ctx: &mut Context<'_>, @@ -207,7 +202,9 @@ impl AsyncRead for RateLimitedStream { let is_ready = delay_is_ready(&mut this.read_delay, ctx); - if !is_ready { return Poll::Pending; } + if !is_ready { + return Poll::Pending; + } this.read_delay = None; @@ -225,7 +222,6 @@ impl AsyncRead for RateLimitedStream { result } - } // we need this for the hyper http client diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs index 4f6b790b..290cf2ba 100644 --- a/proxmox-http/src/client/rate_limiter.rs +++ b/proxmox-http/src/client/rate_limiter.rs @@ -1,5 +1,5 @@ -use std::time::{Duration, Instant}; use std::convert::TryInto; +use std::time::{Duration, Instant}; use anyhow::{bail, Error}; @@ -36,7 +36,6 @@ struct TbfState { } impl TbfState { - const NO_DELAY: Duration = Duration::from_millis(0); fn refill_bucket(&mut self, rate: u64, current_time: Instant) { @@ -45,12 +44,15 @@ impl TbfState { None => return, }; - if time_diff == 0 { return; } + if time_diff == 0 { + return; + } self.last_update = current_time; let allowed_traffic = ((time_diff.saturating_mul(rate as u128)) / 1_000_000_000) - .try_into().unwrap_or(u64::MAX); + .try_into() + .unwrap_or(u64::MAX); self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic); } @@ -70,7 +72,9 @@ impl TbfState { if self.consumed_tokens <= bucket_size { return Self::NO_DELAY; } - Duration::from_nanos((self.consumed_tokens - bucket_size).saturating_mul(1_000_000_000)/rate) + Duration::from_nanos( + (self.consumed_tokens - bucket_size).saturating_mul(1_000_000_000) / rate, + ) } } @@ -80,13 +84,12 @@ impl TbfState { /// change/modify the layout (do not add fields) #[repr(C)] pub struct RateLimiter { - rate: u64, // tokens/second + rate: u64, // tokens/second bucket_size: u64, // TBF bucket size state: TbfState, } impl RateLimiter { - /// Creates a new instance, using [Instant::now] as start time. pub fn new(rate: u64, bucket_size: u64) -> Self { let start_time = Instant::now(); @@ -109,7 +112,6 @@ impl RateLimiter { } impl RateLimit for RateLimiter { - fn update_rate(&mut self, rate: u64, bucket_size: u64) { self.rate = rate; @@ -125,12 +127,12 @@ impl RateLimit for RateLimiter { } fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration { - self.state.register_traffic(self.rate, self.bucket_size, current_time, data_len) + self.state + .register_traffic(self.rate, self.bucket_size, current_time, data_len) } } -impl ShareableRateLimit for std::sync::Mutex { - +impl ShareableRateLimit for std::sync::Mutex { fn update_rate(&self, rate: u64, bucket_size: u64) { self.lock().unwrap().update_rate(rate, bucket_size); } @@ -140,22 +142,22 @@ impl ShareableRateLimit for std::sync::Mutex { } fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration { - self.lock().unwrap().register_traffic(current_time, data_len) + self.lock() + .unwrap() + .register_traffic(current_time, data_len) } } - /// Array of rate limiters. /// /// A group of rate limiters with same configuration. pub struct RateLimiterVec { - rate: u64, // tokens/second + rate: u64, // tokens/second bucket_size: u64, // TBF bucket size state: Vec, } impl RateLimiterVec { - /// Creates a new instance, using [Instant::now] as start time. pub fn new(group_size: usize, rate: u64, bucket_size: u64) -> Self { let start_time = Instant::now(); @@ -163,7 +165,12 @@ impl RateLimiterVec { } /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`. - pub fn with_start_time(group_size: usize, rate: u64, bucket_size: u64, start_time: Instant) -> Self { + pub fn with_start_time( + group_size: usize, + rate: u64, + bucket_size: u64, + start_time: Instant, + ) -> Self { let state = TbfState { traffic: 0, last_update: start_time, @@ -191,7 +198,12 @@ impl RateLimiterVec { } /// Register traffic at the specified index - pub fn register_traffic(&mut self, index: usize, current_time: Instant, data_len: u64) -> Result { + pub fn register_traffic( + &mut self, + index: usize, + current_time: Instant, + data_len: u64, + ) -> Result { if index >= self.state.len() { bail!("RateLimiterVec::register_traffic - index out of range"); }