diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs index 30e66d58..5ef81000 100644 --- a/proxmox-http/src/client/mod.rs +++ b/proxmox-http/src/client/mod.rs @@ -3,7 +3,7 @@ //! Contains a lightweight wrapper around `hyper` with support for TLS connections. mod rate_limiter; -pub use rate_limiter::RateLimiter; +pub use rate_limiter::{RateLimit, RateLimiter}; mod rate_limited_stream; pub use rate_limited_stream::RateLimitedStream; diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs index 865a4261..c288849a 100644 --- a/proxmox-http/src/client/rate_limited_stream.rs +++ b/proxmox-http/src/client/rate_limited_stream.rs @@ -11,7 +11,7 @@ use hyper::client::connect::{Connection, Connected}; use std::task::{Context, Poll}; -use super::RateLimiter; +use super::{RateLimit, RateLimiter}; /// A rate limited stream using [RateLimiter] pub struct RateLimitedStream { diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs index 37362b10..72605ca2 100644 --- a/proxmox-http/src/client/rate_limiter.rs +++ b/proxmox-http/src/client/rate_limiter.rs @@ -1,6 +1,19 @@ use std::time::{Duration, Instant}; use std::convert::TryInto; +/// Rate limiter interface. +pub trait RateLimit { + /// Update rate and bucket size + fn update_rate(&mut self, rate: u64, bucket_size: u64); + + /// Returns the average rate (since `start_time`) + fn average_rate(&self, current_time: Instant) -> f64; + + /// Register traffic, returning a proposed delay to reach the + /// expected rate. + fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration; +} + /// Token bucket based rate limiter pub struct RateLimiter { rate: u64, // tokens/second @@ -34,27 +47,6 @@ impl RateLimiter { } } - /// Update rate and bucket size - pub fn update_rate(&mut self, rate: u64, bucket_size: u64) { - self.rate = rate; - - if bucket_size < self.bucket_size && self.consumed_tokens > bucket_size { - self.consumed_tokens = bucket_size; // start again - } - - self.bucket_size = bucket_size; - } - - /// Returns the average rate (since `start_time`) - pub fn average_rate(&self, current_time: Instant) -> f64 { - let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64(); - if time_diff <= 0.0 { - 0.0 - } else { - (self.traffic as f64) / time_diff - } - } - fn refill_bucket(&mut self, current_time: Instant) { let time_diff = match current_time.checked_duration_since(self.last_update) { Some(duration) => duration.as_nanos(), @@ -73,9 +65,30 @@ impl RateLimiter { self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic); } +} - /// Register traffic, returning a proposed delay to reach the expected rate. - pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration { +impl RateLimit for RateLimiter { + + fn update_rate(&mut self, rate: u64, bucket_size: u64) { + self.rate = rate; + + if bucket_size < self.bucket_size && self.consumed_tokens > bucket_size { + self.consumed_tokens = bucket_size; // start again + } + + self.bucket_size = bucket_size; + } + + fn average_rate(&self, current_time: Instant) -> f64 { + let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64(); + if time_diff <= 0.0 { + 0.0 + } else { + (self.traffic as f64) / time_diff + } + } + + fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration { self.refill_bucket(current_time); self.traffic += data_len;