http: rustfmt

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-05-14 15:44:48 +02:00 committed by Dietmar Maurer
parent 910949db01
commit 94378967e3
5 changed files with 90 additions and 119 deletions

View File

@ -1,4 +1,4 @@
use anyhow::{Error, format_err, bail};
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::sync::Arc;
@ -8,11 +8,7 @@ use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
use tokio::io::{
AsyncRead,
AsyncReadExt,
AsyncWriteExt,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@ -29,7 +25,11 @@ pub struct HttpsConnector {
}
impl HttpsConnector {
pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector, tcp_keepalive: u32) -> Self {
pub fn with_connector(
mut connector: HttpConnector,
ssl_connector: SslConnector,
tcp_keepalive: u32,
) -> Self {
connector.enforce_http(false);
Self {
connector,
@ -61,21 +61,27 @@ impl HttpsConnector {
Ok(())
}
async fn parse_connect_response<R: AsyncRead + Unpin>(
stream: &mut R,
) -> Result<(), Error> {
async fn parse_connect_response<R: AsyncRead + Unpin>(stream: &mut R) -> Result<(), Error> {
let mut data: Vec<u8> = Vec::new();
let mut buffer = [0u8; 256];
const END_MARK: &[u8; 4] = b"\r\n\r\n";
'outer: loop {
let n = stream.read(&mut buffer[..]).await?;
if n == 0 { break; }
let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
if n == 0 {
break;
}
let search_start = if data.len() > END_MARK.len() {
data.len() - END_MARK.len() + 1
} else {
0
};
data.extend(&buffer[..n]);
if data.len() >= END_MARK.len() {
if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
if let Some(pos) = data[search_start..]
.windows(END_MARK.len())
.position(|w| w == END_MARK)
{
let response = String::from_utf8_lossy(&data);
let status_line = match response.split("\r\n").next() {
Some(status) => status,
@ -89,7 +95,8 @@ impl HttpsConnector {
break 'outer;
}
}
if data.len() > 1024*32 { // max 32K (random chosen limit)
if data.len() > 1024 * 32 {
// max 32K (random chosen limit)
bail!("too many bytes");
}
}
@ -101,12 +108,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
type Response = MaybeTlsStream<TcpStream>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connector
.poll_ready(ctx)
.map_err(|err| err.into())
self.connector.poll_ready(ctx).map_err(|err| err.into())
}
fn call(&mut self, dst: Uri) -> Self::Future {
@ -123,7 +129,6 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let keepalive = self.tcp_keepalive;
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
let proxy_authority = match helpers::build_authority(&proxy.host, proxy.port) {
@ -145,17 +150,16 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
let mut tcp_stream = connector
.call(proxy_uri)
.await
.map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
connect_request
.push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
}
connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
@ -169,18 +173,19 @@ impl hyper::service::Service<Uri> for HttpsConnector {
} else {
Ok(MaybeTlsStream::Normal(tcp_stream))
}
}.boxed()
}
.boxed()
} else {
async move {
let tcp_stream = connector
.call(proxy_uri)
.await
.map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
async move {
let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
Ok(MaybeTlsStream::Proxied(tcp_stream))
}.boxed()
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
}
} else {
async move {
@ -197,7 +202,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
} else {
Ok(MaybeTlsStream::Normal(tcp_stream))
}
}.boxed()
}
.boxed()
}
}
}

View File

@ -1,16 +1,13 @@
use anyhow::{Error, format_err, bail};
use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use hyper::Body;
use hyper::client::{Client, HttpConnector};
use http::{Request, Response, HeaderValue};
use openssl::ssl::{SslConnector, SslMethod};
use futures::*;
use http::{HeaderValue, Request, Response};
use hyper::client::{Client, HttpConnector};
use hyper::Body;
use openssl::ssl::{SslConnector, SslMethod};
use crate::http::{
ProxyConfig,
client::HttpsConnector,
};
use crate::http::{client::HttpsConnector, ProxyConfig};
/// Options for a SimpleHttp client.
#[derive(Default)]
@ -27,7 +24,7 @@ impl SimpleHttpOptions {
fn get_proxy_authorization(&self) -> Option<String> {
if let Some(ref proxy_config) = self.proxy_config {
if !proxy_config.force_connect {
return proxy_config.authorization.clone();
return proxy_config.authorization.clone();
}
}
@ -55,7 +52,11 @@ impl SimpleHttp {
pub fn with_ssl_connector(ssl_connector: SslConnector, options: SimpleHttpOptions) -> Self {
let connector = HttpConnector::new();
let mut https = HttpsConnector::with_connector(connector, ssl_connector, options.tcp_keepalive.unwrap_or(7200));
let mut https = HttpsConnector::with_connector(
connector,
ssl_connector,
options.tcp_keepalive.unwrap_or(7200),
);
if let Some(ref proxy_config) = options.proxy_config {
https.set_proxy(proxy_config.clone());
}
@ -71,12 +72,10 @@ impl SimpleHttp {
fn add_proxy_headers(&self, request: &mut Request<Body>) -> Result<(), Error> {
if request.uri().scheme() != Some(&http::uri::Scheme::HTTPS) {
if let Some(ref authorization) = self.options.get_proxy_authorization() {
request
.headers_mut()
.insert(
http::header::PROXY_AUTHORIZATION,
HeaderValue::from_str(authorization)?,
);
request.headers_mut().insert(
http::header::PROXY_AUTHORIZATION,
HeaderValue::from_str(authorization)?,
);
}
}
Ok(())
@ -89,13 +88,13 @@ impl SimpleHttp {
HeaderValue::from_str(Self::DEFAULT_USER_AGENT_STRING)?
};
request.headers_mut().insert(hyper::header::USER_AGENT, user_agent);
request
.headers_mut()
.insert(hyper::header::USER_AGENT, user_agent);
self.add_proxy_headers(&mut request)?;
self.client.request(request)
.map_err(Error::from)
.await
self.client.request(request).map_err(Error::from).await
}
pub async fn post(
@ -104,7 +103,6 @@ impl SimpleHttp {
body: Option<String>,
content_type: Option<&str>,
) -> Result<Response<Body>, Error> {
let body = if let Some(body) = body {
Body::from(body)
} else {
@ -126,10 +124,7 @@ impl SimpleHttp {
uri: &str,
extra_headers: Option<&HashMap<String, String>>,
) -> Result<String, Error> {
let mut request = Request::builder()
.method("GET")
.uri(uri);
let mut request = Request::builder().method("GET").uri(uri);
if let Some(hs) = extra_headers {
for (h, v) in hs.iter() {

View File

@ -6,10 +6,11 @@ use http::uri::Authority;
pub fn build_authority(host: &str, port: u16) -> Result<Authority, Error> {
let bytes = host.as_bytes();
let len = bytes.len();
let authority = if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len-1] != b']' {
format!("[{}]:{}", host, port).parse()?
} else {
format!("{}:{}", host, port).parse()?
};
let authority =
if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len - 1] != b']' {
format!("[{}]:{}", host, port).parse()?
} else {
format!("{}:{}", host, port).parse()?
};
Ok(authority)
}

View File

@ -1,4 +1,4 @@
use anyhow::{Error, format_err, bail};
use anyhow::{bail, format_err, Error};
use http::Uri;
@ -16,7 +16,6 @@ pub struct ProxyConfig {
impl ProxyConfig {
/// Parse proxy config from ALL_PROXY environment var
pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> {
// We only support/use ALL_PROXY environment
match std::env::var_os("ALL_PROXY") {
@ -70,7 +69,8 @@ impl ProxyConfig {
authorization,
force_connect: false,
})
}).map_err(|err| format_err!("parse_proxy_url failed: {}", err))
})
.map_err(|err| format_err!("parse_proxy_url failed: {}", err))
}
/// Assemble canonical proxy string (including scheme and port)
@ -78,7 +78,7 @@ impl ProxyConfig {
let authority = helpers::build_authority(&self.host, self.port)?;
Ok(match self.authorization {
None => format!("http://{}", authority),
Some(ref authorization) => format!("http://{}@{}", authorization, authority)
Some(ref authorization) => format!("http://{}@{}", authorization, authority),
})
}
}

View File

@ -2,7 +2,7 @@ use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::client::connect::{Connection, Connected};
use hyper::client::connect::{Connected, Connection};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_openssl::SslStream;
@ -22,15 +22,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
buf: &mut ReadBuf,
) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
MaybeTlsStream::Normal(ref mut s) => Pin::new(s).poll_read(cx, buf),
MaybeTlsStream::Proxied(ref mut s) => Pin::new(s).poll_read(cx, buf),
MaybeTlsStream::Secured(ref mut s) => Pin::new(s).poll_read(cx, buf),
}
}
}
@ -42,15 +36,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
MaybeTlsStream::Normal(ref mut s) => Pin::new(s).poll_write(cx, buf),
MaybeTlsStream::Proxied(ref mut s) => Pin::new(s).poll_write(cx, buf),
MaybeTlsStream::Secured(ref mut s) => Pin::new(s).poll_write(cx, buf),
}
}
@ -60,15 +48,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_write_vectored(cx, bufs)
}
MaybeTlsStream::Normal(ref mut s) => Pin::new(s).poll_write_vectored(cx, bufs),
MaybeTlsStream::Proxied(ref mut s) => Pin::new(s).poll_write_vectored(cx, bufs),
MaybeTlsStream::Secured(ref mut s) => Pin::new(s).poll_write_vectored(cx, bufs),
}
}
@ -82,36 +64,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
MaybeTlsStream::Normal(ref mut s) => Pin::new(s).poll_flush(cx),
MaybeTlsStream::Proxied(ref mut s) => Pin::new(s).poll_flush(cx),
MaybeTlsStream::Secured(ref mut s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
MaybeTlsStream::Normal(ref mut s) => Pin::new(s).poll_shutdown(cx),
MaybeTlsStream::Proxied(ref mut s) => Pin::new(s).poll_shutdown(cx),
MaybeTlsStream::Secured(ref mut s) => Pin::new(s).poll_shutdown(cx),
}
}
}
// we need this for the hyper http client
impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
{
impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S> {
fn connected(&self) -> Connected {
match self {
MaybeTlsStream::Normal(s) => s.connected(),