proxmox-http: define a RateLimit trait

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
Dietmar Maurer 2021-11-13 08:02:27 +01:00
parent 564703b195
commit 937d1a6095
3 changed files with 38 additions and 25 deletions

View File

@ -3,7 +3,7 @@
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
mod rate_limiter;
pub use rate_limiter::RateLimiter;
pub use rate_limiter::{RateLimit, RateLimiter};
mod rate_limited_stream;
pub use rate_limited_stream::RateLimitedStream;

View File

@ -11,7 +11,7 @@ use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
use super::RateLimiter;
use super::{RateLimit, RateLimiter};
/// A rate limited stream using [RateLimiter]
pub struct RateLimitedStream<S> {

View File

@ -1,6 +1,19 @@
use std::time::{Duration, Instant};
use std::convert::TryInto;
/// Rate limiter interface.
pub trait RateLimit {
/// Update rate and bucket size
fn update_rate(&mut self, rate: u64, bucket_size: u64);
/// Returns the average rate (since `start_time`)
fn average_rate(&self, current_time: Instant) -> f64;
/// Register traffic, returning a proposed delay to reach the
/// expected rate.
fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration;
}
/// Token bucket based rate limiter
pub struct RateLimiter {
rate: u64, // tokens/second
@ -34,27 +47,6 @@ impl RateLimiter {
}
}
/// Update rate and bucket size
pub fn update_rate(&mut self, rate: u64, bucket_size: u64) {
self.rate = rate;
if bucket_size < self.bucket_size && self.consumed_tokens > bucket_size {
self.consumed_tokens = bucket_size; // start again
}
self.bucket_size = bucket_size;
}
/// Returns the average rate (since `start_time`)
pub fn average_rate(&self, current_time: Instant) -> f64 {
let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
if time_diff <= 0.0 {
0.0
} else {
(self.traffic as f64) / time_diff
}
}
fn refill_bucket(&mut self, current_time: Instant) {
let time_diff = match current_time.checked_duration_since(self.last_update) {
Some(duration) => duration.as_nanos(),
@ -73,9 +65,30 @@ impl RateLimiter {
self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
}
}
/// Register traffic, returning a proposed delay to reach the expected rate.
pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
impl RateLimit for RateLimiter {
fn update_rate(&mut self, rate: u64, bucket_size: u64) {
self.rate = rate;
if bucket_size < self.bucket_size && self.consumed_tokens > bucket_size {
self.consumed_tokens = bucket_size; // start again
}
self.bucket_size = bucket_size;
}
fn average_rate(&self, current_time: Instant) -> f64 {
let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
if time_diff <= 0.0 {
0.0
} else {
(self.traffic as f64) / time_diff
}
}
fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
self.refill_bucket(current_time);
self.traffic += data_len;