http: rustfmt

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-04-10 12:41:21 +02:00
parent 05cad8926b
commit 0eeb0dd17c
3 changed files with 64 additions and 66 deletions

View File

@ -148,7 +148,6 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let read_limiter = self.read_limiter.clone(); let read_limiter = self.read_limiter.clone();
let write_limiter = self.write_limiter.clone(); let write_limiter = self.write_limiter.clone();
if let Some(ref proxy) = self.proxy { if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect; let use_connect = is_https || proxy.force_connect;
@ -177,11 +176,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
let mut tcp_stream = RateLimitedStream::with_limiter( let mut tcp_stream =
tcp_stream, RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
read_limiter,
write_limiter,
);
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization { if let Some(authorization) = authorization {
@ -210,11 +206,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
let tcp_stream = RateLimitedStream::with_limiter( let tcp_stream =
tcp_stream, RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
read_limiter,
write_limiter,
);
Ok(MaybeTlsStream::Proxied(tcp_stream)) Ok(MaybeTlsStream::Proxied(tcp_stream))
} }
@ -230,11 +223,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
let tcp_stream = RateLimitedStream::with_limiter( let tcp_stream =
tcp_stream, RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
read_limiter,
write_limiter,
);
if is_https { if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await Self::secure_stream(tcp_stream, &ssl_connector, &host).await

View File

@ -1,17 +1,17 @@
use std::pin::Pin; use std::io::IoSlice;
use std::marker::Unpin; use std::marker::Unpin;
use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::io::IoSlice;
use futures::Future; use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite}; use hyper::client::connect::{Connected, Connection};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::time::Sleep; use tokio::time::Sleep;
use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use super::{ShareableRateLimit, RateLimiter}; use super::{RateLimiter, ShareableRateLimit};
type SharedRateLimit = Arc<dyn ShareableRateLimit>; type SharedRateLimit = Arc<dyn ShareableRateLimit>;
@ -21,7 +21,8 @@ pub struct RateLimitedStream<S> {
read_delay: Option<Pin<Box<Sleep>>>, read_delay: Option<Pin<Box<Sleep>>>,
write_limiter: Option<SharedRateLimit>, write_limiter: Option<SharedRateLimit>,
write_delay: Option<Pin<Box<Sleep>>>, write_delay: Option<Pin<Box<Sleep>>>,
update_limiter_cb: Option<Box<dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send>>, update_limiter_cb:
Option<Box<dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send>>,
last_limiter_update: Instant, last_limiter_update: Instant,
stream: S, stream: S,
} }
@ -33,7 +34,6 @@ impl RateLimitedStream<tokio::net::TcpStream> {
} }
impl<S> RateLimitedStream<S> { impl<S> RateLimitedStream<S> {
/// Creates a new instance with reads and writes limited to the same `rate`. /// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self { pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
let now = Instant::now(); let now = Instant::now();
@ -67,7 +67,9 @@ impl <S> RateLimitedStream<S> {
/// ///
/// Note: This function is called within an async context, so it /// Note: This function is called within an async context, so it
/// should be fast and must not block. /// should be fast and must not block.
pub fn with_limiter_update_cb<F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static>( pub fn with_limiter_update_cb<
F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
>(
stream: S, stream: S,
update_limiter_cb: F, update_limiter_cb: F,
) -> Self { ) -> Self {
@ -95,11 +97,7 @@ impl <S> RateLimitedStream<S> {
} }
} }
fn register_traffic( fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option<Pin<Box<Sleep>>> {
limiter: &(dyn ShareableRateLimit),
count: usize,
) -> Option<Pin<Box<Sleep>>>{
const MIN_DELAY: Duration = Duration::from_millis(10); const MIN_DELAY: Duration = Duration::from_millis(10);
let now = Instant::now(); let now = Instant::now();
@ -114,25 +112,24 @@ fn register_traffic(
fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool { fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
match delay { match delay {
Some(ref mut future) => { Some(ref mut future) => future.as_mut().poll(ctx).is_ready(),
future.as_mut().poll(ctx).is_ready()
}
None => true, None => true,
} }
} }
impl<S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> { impl<S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
ctx: &mut Context<'_>, ctx: &mut Context<'_>,
buf: &[u8] buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> { ) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut(); let this = self.get_mut();
let is_ready = delay_is_ready(&mut this.write_delay, ctx); let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; } if !is_ready {
return Poll::Pending;
}
this.write_delay = None; this.write_delay = None;
@ -156,13 +153,15 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write_vectored( fn poll_write_vectored(
self: Pin<&mut Self>, self: Pin<&mut Self>,
ctx: &mut Context<'_>, ctx: &mut Context<'_>,
bufs: &[IoSlice<'_>] bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> { ) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut(); let this = self.get_mut();
let is_ready = delay_is_ready(&mut this.write_delay, ctx); let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; } if !is_ready {
return Poll::Pending;
}
this.write_delay = None; this.write_delay = None;
@ -179,17 +178,14 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
result result
} }
fn poll_flush( fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut(); let this = self.get_mut();
Pin::new(&mut this.stream).poll_flush(ctx) Pin::new(&mut this.stream).poll_flush(ctx)
} }
fn poll_shutdown( fn poll_shutdown(
self: Pin<&mut Self>, self: Pin<&mut Self>,
ctx: &mut Context<'_> ctx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> { ) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut(); let this = self.get_mut();
Pin::new(&mut this.stream).poll_shutdown(ctx) Pin::new(&mut this.stream).poll_shutdown(ctx)
@ -197,7 +193,6 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
} }
impl<S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> { impl<S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
ctx: &mut Context<'_>, ctx: &mut Context<'_>,
@ -207,7 +202,9 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
let is_ready = delay_is_ready(&mut this.read_delay, ctx); let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; } if !is_ready {
return Poll::Pending;
}
this.read_delay = None; this.read_delay = None;
@ -225,7 +222,6 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
result result
} }
} }
// we need this for the hyper http client // we need this for the hyper http client

View File

@ -1,5 +1,5 @@
use std::time::{Duration, Instant};
use std::convert::TryInto; use std::convert::TryInto;
use std::time::{Duration, Instant};
use anyhow::{bail, Error}; use anyhow::{bail, Error};
@ -36,7 +36,6 @@ struct TbfState {
} }
impl TbfState { impl TbfState {
const NO_DELAY: Duration = Duration::from_millis(0); const NO_DELAY: Duration = Duration::from_millis(0);
fn refill_bucket(&mut self, rate: u64, current_time: Instant) { fn refill_bucket(&mut self, rate: u64, current_time: Instant) {
@ -45,12 +44,15 @@ impl TbfState {
None => return, None => return,
}; };
if time_diff == 0 { return; } if time_diff == 0 {
return;
}
self.last_update = current_time; self.last_update = current_time;
let allowed_traffic = ((time_diff.saturating_mul(rate as u128)) / 1_000_000_000) let allowed_traffic = ((time_diff.saturating_mul(rate as u128)) / 1_000_000_000)
.try_into().unwrap_or(u64::MAX); .try_into()
.unwrap_or(u64::MAX);
self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic); self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
} }
@ -70,7 +72,9 @@ impl TbfState {
if self.consumed_tokens <= bucket_size { if self.consumed_tokens <= bucket_size {
return Self::NO_DELAY; return Self::NO_DELAY;
} }
Duration::from_nanos((self.consumed_tokens - bucket_size).saturating_mul(1_000_000_000)/rate) Duration::from_nanos(
(self.consumed_tokens - bucket_size).saturating_mul(1_000_000_000) / rate,
)
} }
} }
@ -86,7 +90,6 @@ pub struct RateLimiter {
} }
impl RateLimiter { impl RateLimiter {
/// Creates a new instance, using [Instant::now] as start time. /// Creates a new instance, using [Instant::now] as start time.
pub fn new(rate: u64, bucket_size: u64) -> Self { pub fn new(rate: u64, bucket_size: u64) -> Self {
let start_time = Instant::now(); let start_time = Instant::now();
@ -109,7 +112,6 @@ impl RateLimiter {
} }
impl RateLimit for RateLimiter { impl RateLimit for RateLimiter {
fn update_rate(&mut self, rate: u64, bucket_size: u64) { fn update_rate(&mut self, rate: u64, bucket_size: u64) {
self.rate = rate; self.rate = rate;
@ -125,12 +127,12 @@ impl RateLimit for RateLimiter {
} }
fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration { fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
self.state.register_traffic(self.rate, self.bucket_size, current_time, data_len) self.state
.register_traffic(self.rate, self.bucket_size, current_time, data_len)
} }
} }
impl<R: RateLimit + Send> ShareableRateLimit for std::sync::Mutex<R> { impl<R: RateLimit + Send> ShareableRateLimit for std::sync::Mutex<R> {
fn update_rate(&self, rate: u64, bucket_size: u64) { fn update_rate(&self, rate: u64, bucket_size: u64) {
self.lock().unwrap().update_rate(rate, bucket_size); self.lock().unwrap().update_rate(rate, bucket_size);
} }
@ -140,11 +142,12 @@ impl <R: RateLimit + Send> ShareableRateLimit for std::sync::Mutex<R> {
} }
fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration { fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration {
self.lock().unwrap().register_traffic(current_time, data_len) self.lock()
.unwrap()
.register_traffic(current_time, data_len)
} }
} }
/// Array of rate limiters. /// Array of rate limiters.
/// ///
/// A group of rate limiters with same configuration. /// A group of rate limiters with same configuration.
@ -155,7 +158,6 @@ pub struct RateLimiterVec {
} }
impl RateLimiterVec { impl RateLimiterVec {
/// Creates a new instance, using [Instant::now] as start time. /// Creates a new instance, using [Instant::now] as start time.
pub fn new(group_size: usize, rate: u64, bucket_size: u64) -> Self { pub fn new(group_size: usize, rate: u64, bucket_size: u64) -> Self {
let start_time = Instant::now(); let start_time = Instant::now();
@ -163,7 +165,12 @@ impl RateLimiterVec {
} }
/// Creates a new instance with specified `rate`, `bucket_size` and `start_time`. /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
pub fn with_start_time(group_size: usize, rate: u64, bucket_size: u64, start_time: Instant) -> Self { pub fn with_start_time(
group_size: usize,
rate: u64,
bucket_size: u64,
start_time: Instant,
) -> Self {
let state = TbfState { let state = TbfState {
traffic: 0, traffic: 0,
last_update: start_time, last_update: start_time,
@ -191,7 +198,12 @@ impl RateLimiterVec {
} }
/// Register traffic at the specified index /// Register traffic at the specified index
pub fn register_traffic(&mut self, index: usize, current_time: Instant, data_len: u64) -> Result<Duration, Error> { pub fn register_traffic(
&mut self,
index: usize,
current_time: Instant,
data_len: u64,
) -> Result<Duration, Error> {
if index >= self.state.len() { if index >= self.state.len() {
bail!("RateLimiterVec::register_traffic - index out of range"); bail!("RateLimiterVec::register_traffic - index out of range");
} }