diff --git a/Cargo.toml b/Cargo.toml index 8cebbd29..7d9644ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ pathpatterns = "0.1.2" pxar = { version = "0.10.1", features = [ "tokio-io" ] } proxmox = { version = "0.15.0", features = [ "sortable-macro" ] } -proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] } +proxmox-http = { version = "0.5.3", features = [ "client", "http-helpers", "websocket" ] } proxmox-io = "1" proxmox-lang = "1" proxmox-router = { version = "1.1", features = [ "cli" ] } diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml index c7b4b1c8..4f6614ad 100644 --- a/pbs-client/Cargo.toml +++ b/pbs-client/Cargo.toml @@ -30,7 +30,7 @@ xdg = "2.2" pathpatterns = "0.1.2" proxmox = "0.15.0" proxmox-fuse = "0.1.1" -proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] } +proxmox-http = { version = "0.5.3", features = [ "client", "http-helpers", "websocket" ] } proxmox-io = { version = "1", features = [ "tokio" ] } proxmox-lang = "1" proxmox-router = { version = "1.1", features = [ "cli" ] } diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index a3bd4cfa..0fc61ed5 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -21,7 +21,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive; use proxmox::tools::fs::CreateOptions; use proxmox_lang::try_block; use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation}; -use proxmox_http::client::{RateLimiter, RateLimitedStream}; +use proxmox_http::client::{RateLimitedStream, ShareableRateLimit}; use pbs_tools::{task_log, task_warn}; use pbs_datastore::DataStore; @@ -1093,7 +1093,7 @@ lazy_static::lazy_static!{ fn lookup_rate_limiter( peer: Option, -) -> (Option>>, Option>>) { +) -> (Option>, Option>) { let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap(); let now = proxmox_time::epoch_i64(); diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs index 9c7387d6..deb6e234 100644 --- a/src/cached_traffic_control.rs +++ b/src/cached_traffic_control.rs @@ -6,7 +6,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use anyhow::Error; use cidr::IpInet; -use proxmox_http::client::RateLimiter; +use proxmox_http::client::{ShareableRateLimit, RateLimiter}; use proxmox_section_config::SectionConfigData; use proxmox_systemd::daily_duration::{parse_daily_duration, DailyDuration}; @@ -26,7 +26,7 @@ pub struct TrafficControlCache { last_update: i64, last_traffic_control_generation: usize, rules: Vec, - limiter_map: HashMap>>, Option>>)>, + limiter_map: HashMap>, Option>)>, use_utc: bool, // currently only used for testing } @@ -84,6 +84,14 @@ fn cannonical_ip(ip: IpAddr) -> IpAddr { } } +fn create_limiter( + rate: u64, + burst: u64, + _direction: bool, // false => in, true => out +) -> Result, Error> { + Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst)))) +} + impl TrafficControlCache { pub fn new() -> Self { @@ -130,6 +138,7 @@ impl TrafficControlCache { self.update_config(&config) } + fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> { self.limiter_map.retain(|key, _value| config.sections.contains_key(key)); @@ -146,16 +155,15 @@ impl TrafficControlCache { Some(ref read_limiter) => { match rule.rate_in { Some(rate_in) => { - read_limiter.lock().unwrap(). - update_rate(rate_in, rule.burst_in.unwrap_or(rate_in)); + read_limiter.update_rate(rate_in, rule.burst_in.unwrap_or(rate_in)); } None => entry.0 = None, } } None => { if let Some(rate_in) = rule.rate_in { - let limiter = RateLimiter::new(rate_in, rule.burst_in.unwrap_or(rate_in)); - entry.0 = Some(Arc::new(Mutex::new(limiter))); + let limiter = create_limiter(rate_in, rule.burst_in.unwrap_or(rate_in), false)?; + entry.0 = Some(limiter); } } } @@ -164,16 +172,15 @@ impl TrafficControlCache { Some(ref write_limiter) => { match rule.rate_out { Some(rate_out) => { - write_limiter.lock().unwrap(). - update_rate(rate_out, rule.burst_out.unwrap_or(rate_out)); + write_limiter.update_rate(rate_out, rule.burst_out.unwrap_or(rate_out)); } None => entry.1 = None, } } None => { if let Some(rate_out) = rule.rate_out { - let limiter = RateLimiter::new(rate_out, rule.burst_out.unwrap_or(rate_out)); - entry.1 = Some(Arc::new(Mutex::new(limiter))); + let limiter = create_limiter(rate_out, rule.burst_out.unwrap_or(rate_out), true)?; + entry.1 = Some(limiter); } } } @@ -212,7 +219,7 @@ impl TrafficControlCache { &self, peer: Option, now: i64, - ) -> (&str, Option>>, Option>>) { + ) -> (&str, Option>, Option>) { let peer = match peer { None => return ("", None, None),