diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs index 71704d56..1bcee7a4 100644 --- a/proxmox-http/src/client/connector.rs +++ b/proxmox-http/src/client/connector.rs @@ -1,7 +1,7 @@ use anyhow::{bail, format_err, Error}; use std::os::unix::io::AsRawFd; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use futures::*; @@ -18,7 +18,9 @@ use crate::proxy_config::ProxyConfig; use crate::tls::MaybeTlsStream; use crate::uri::build_authority; -use super::{RateLimiter, RateLimitedStream}; +use super::{RateLimitedStream, ShareableRateLimit}; + +type SharedRateLimit = Arc; #[derive(Clone)] pub struct HttpsConnector { @@ -26,8 +28,8 @@ pub struct HttpsConnector { ssl_connector: Arc, proxy: Option, tcp_keepalive: u32, - read_limiter: Option>>, - write_limiter: Option>>, + read_limiter: Option, + write_limiter: Option, } impl HttpsConnector { @@ -51,11 +53,11 @@ impl HttpsConnector { self.proxy = Some(proxy); } - pub fn set_read_limiter(&mut self, limiter: Option>>) { + pub fn set_read_limiter(&mut self, limiter: Option) { self.read_limiter = limiter; } - pub fn set_write_limiter(&mut self, limiter: Option>>) { + pub fn set_write_limiter(&mut self, limiter: Option) { self.write_limiter = limiter; } diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs index 5ef81000..fa57408e 100644 --- a/proxmox-http/src/client/mod.rs +++ b/proxmox-http/src/client/mod.rs @@ -3,7 +3,7 @@ //! Contains a lightweight wrapper around `hyper` with support for TLS connections. mod rate_limiter; -pub use rate_limiter::{RateLimit, RateLimiter}; +pub use rate_limiter::{RateLimit, RateLimiter, ShareableRateLimit}; mod rate_limited_stream; pub use rate_limited_stream::RateLimitedStream; diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs index c288849a..3a0eabd8 100644 --- a/proxmox-http/src/client/rate_limited_stream.rs +++ b/proxmox-http/src/client/rate_limited_stream.rs @@ -11,15 +11,17 @@ use hyper::client::connect::{Connection, Connected}; use std::task::{Context, Poll}; -use super::{RateLimit, RateLimiter}; +use super::{ShareableRateLimit, RateLimiter}; + +type SharedRateLimit = Arc; /// A rate limited stream using [RateLimiter] pub struct RateLimitedStream { - read_limiter: Option>>, + read_limiter: Option, read_delay: Option>>, - write_limiter: Option>>, + write_limiter: Option, write_delay: Option>>, - update_limiter_cb: Option (Option>>, Option>>) + Send>>, + update_limiter_cb: Option (Option, Option) + Send>>, last_limiter_update: Instant, stream: S, } @@ -35,18 +37,20 @@ impl RateLimitedStream { /// Creates a new instance with reads and writes limited to the same `rate`. pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self { let now = Instant::now(); - let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now))); - let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now))); + let read_limiter = RateLimiter::with_start_time(rate, bucket_size, now); + let read_limiter: SharedRateLimit = Arc::new(Mutex::new(read_limiter)); + let write_limiter = RateLimiter::with_start_time(rate, bucket_size, now); + let write_limiter: SharedRateLimit = Arc::new(Mutex::new(write_limiter)); Self::with_limiter(stream, Some(read_limiter), Some(write_limiter)) } /// Creates a new instance with specified [RateLimiters] for reads and writes. pub fn with_limiter( stream: S, - read_limiter: Option>>, - write_limiter: Option>>, + read_limiter: Option, + write_limiter: Option, ) -> Self { - Self { + Self { read_limiter, read_delay: None, write_limiter, @@ -63,7 +67,7 @@ impl RateLimitedStream { /// /// Note: This function is called within an async context, so it /// should be fast and must not block. - pub fn with_limiter_update_cb (Option>>, Option>>) + Send + 'static>( + pub fn with_limiter_update_cb (Option, Option) + Send + 'static>( stream: S, update_limiter_cb: F, ) -> Self { @@ -92,15 +96,14 @@ impl RateLimitedStream { } fn register_traffic( - limiter: &Mutex, + limiter: &(dyn ShareableRateLimit), count: usize, ) -> Option>>{ const MIN_DELAY: Duration = Duration::from_millis(10); let now = Instant::now(); - let delay = limiter.lock().unwrap() - .register_traffic(now, count as u64); + let delay = limiter.register_traffic(now, count as u64); if delay >= MIN_DELAY { let sleep = tokio::time::sleep(delay); Some(Box::pin(sleep)) @@ -137,9 +140,9 @@ impl AsyncWrite for RateLimitedStream { let result = Pin::new(&mut this.stream).poll_write(ctx, buf); - if let Some(ref limiter) = this.write_limiter { + if let Some(ref mut limiter) = this.write_limiter { if let Poll::Ready(Ok(count)) = result { - this.write_delay = register_traffic(limiter, count); + this.write_delay = register_traffic(limiter.as_ref(), count); } } @@ -169,7 +172,7 @@ impl AsyncWrite for RateLimitedStream { if let Some(ref limiter) = this.write_limiter { if let Poll::Ready(Ok(count)) = result { - this.write_delay = register_traffic(limiter, count); + this.write_delay = register_traffic(limiter.as_ref(), count); } } @@ -216,7 +219,7 @@ impl AsyncRead for RateLimitedStream { if let Some(ref read_limiter) = this.read_limiter { if let Poll::Ready(Ok(())) = &result { let count = buf.filled().len() - filled_len; - this.read_delay = register_traffic(read_limiter, count); + this.read_delay = register_traffic(read_limiter.as_ref(), count); } } diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs index 72605ca2..84856b19 100644 --- a/proxmox-http/src/client/rate_limiter.rs +++ b/proxmox-http/src/client/rate_limiter.rs @@ -14,6 +14,15 @@ pub trait RateLimit { fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration; } +/// Like [RateLimit], but does not require self to be mutable. +/// +/// This is useful for types providing internal mutability (Mutex). +pub trait ShareableRateLimit: Send + Sync { + fn update_rate(&self, rate: u64, bucket_size: u64); + fn average_rate(&self, current_time: Instant) -> f64; + fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration; +} + /// Token bucket based rate limiter pub struct RateLimiter { rate: u64, // tokens/second @@ -100,3 +109,18 @@ impl RateLimit for RateLimiter { Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate) } } + +impl ShareableRateLimit for std::sync::Mutex { + + fn update_rate(&self, rate: u64, bucket_size: u64) { + self.lock().unwrap().update_rate(rate, bucket_size); + } + + fn average_rate(&self, current_time: Instant) -> f64 { + self.lock().unwrap().average_rate(current_time) + } + + fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration { + self.lock().unwrap().register_traffic(current_time, data_len) + } +}