client: drop retry logic

This should be moved to where we actually need it, not be part of the
generic product client.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2023-08-07 10:57:38 +02:00
parent 2b212cf4e3
commit a7435e757b

View File

@ -1,7 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
@ -29,44 +28,10 @@ pub trait HttpClient: Send + Sync {
fn request(&self, request: Request<Vec<u8>>) -> Self::Request; fn request(&self, request: Request<Vec<u8>>) -> Self::Request;
} }
/// In a cluster we may be able to connect to a different node if one connection fails.
struct ApiUrls {
/// This is the list of cluster node URls.
urls: Vec<Uri>,
/// This is the current "good" URL. If we fail to connect here, we'll walk around the `urls`
/// vec once before failing completely.
/// Once a "good" URL is reached, we update this.
current: AtomicUsize,
/// Since another thread might be doing the same thing simultaneously, let's use this to keep
/// track of when some thread has updated `current`. If we see a `generation` bump while
/// probing URLs, we'll retry the new `current`.
generation: AtomicU32,
}
impl ApiUrls {
fn new(uri: Uri) -> Self {
Self {
urls: vec![uri],
current: AtomicUsize::new(0),
generation: AtomicU32::new(0),
}
}
fn index(&self) -> usize {
self.current.load(Ordering::Relaxed)
}
fn generation(&self) -> u32 {
self.generation.load(Ordering::Acquire)
}
}
/// Proxmox VE high level API client. /// Proxmox VE high level API client.
pub struct Client<C, E: Environment> { pub struct Client<C, E: Environment> {
env: E, env: E,
api_urls: ApiUrls, api_url: Uri,
auth: StdMutex<Option<Arc<AuthenticationKind>>>, auth: StdMutex<Option<Arc<AuthenticationKind>>>,
client: C, client: C,
pve_compat: bool, pve_compat: bool,
@ -127,7 +92,7 @@ where
pub fn with_client(api_url: Uri, environment: E, client: C) -> Self { pub fn with_client(api_url: Uri, environment: E, client: C) -> Self {
Self { Self {
env: environment, env: environment,
api_urls: ApiUrls::new(api_url), api_url,
auth: StdMutex::new(None), auth: StdMutex::new(None),
client, client,
pve_compat: false, pve_compat: false,
@ -173,74 +138,12 @@ where
/// If no valid ticket is available already, this will connect to the PVE API and perform /// If no valid ticket is available already, this will connect to the PVE API and perform
/// authentication. /// authentication.
pub async fn login(&self) -> Result<(), E::Error> { pub async fn login(&self) -> Result<(), E::Error> {
let mut url_index = self.api_urls.index(); let (userid, login) = self.need_login().await?;
let current_url = &self.api_urls.urls[url_index]; let Some(login) = login else { return Ok(()) };
let (userid, login) = self.need_login(current_url).await?; let login = login.pve_compatibility(self.pve_compat);
let login = match login {
None => return Ok(()),
Some(login) => login,
};
let mut login = login.pve_compatibility(self.pve_compat); let response = self.client.request(to_request(login.request())?).await?;
let mut retry = None;
let generation = self.api_urls.generation();
// remember the finally successful address
let retry_success = |retry: &mut Option<usize>, url_index: usize| {
if retry.is_some() {
*retry = None;
if self.api_urls.generation() == generation {
self.api_urls.current.store(url_index, Ordering::Relaxed);
self.api_urls
.generation
.store(generation + 1, Ordering::Release);
}
}
};
// check whether we should be retrying a new address
let should_retry =
|retry: &mut Option<usize>, login: &mut Login, url_index: &mut usize| -> bool {
match *retry {
Some(retry) => {
if retry == *url_index {
return false;
}
}
None => *retry = Some(*url_index),
}
// if another thread successfully found a working URL already, use that as our last
// attempt:
if self.api_urls.generation() != generation {
*url_index = self.api_urls.index();
*retry = Some(*url_index);
return true;
}
// otherwise cycle through the available addresses:
*url_index = (*url_index + 1) % self.api_urls.urls.len();
login.set_url(self.api_urls.urls[*url_index].to_string());
true
};
loop {
let current_url = &self.api_urls.urls[url_index];
let response = match self.client.request(to_request(login.request())?).await {
Ok(r) => {
retry_success(&mut retry, url_index);
r
}
Err(err) => {
if should_retry(&mut retry, &mut login, &mut url_index) {
continue;
}
return Err(err.into());
}
};
if !response.status().is_success() { if !response.status().is_success() {
// FIXME: does `http` somehow expose the status string? // FIXME: does `http` somehow expose the status string?
@ -251,33 +154,19 @@ where
} }
let challenge = match login.response(response.body()).map_err(E::Error::bad_api)? { let challenge = match login.response(response.body()).map_err(E::Error::bad_api)? {
TicketResult::Full(auth) => { TicketResult::Full(auth) => return self.finish_auth(&userid, auth).await,
return self.finish_auth(current_url, &userid, auth).await
}
TicketResult::TfaRequired(challenge) => challenge, TicketResult::TfaRequired(challenge) => challenge,
}; };
let response = self let response = self
.env .env
.query_second_factor_async(current_url, &userid, &challenge.challenge) .query_second_factor_async(&self.api_url, &userid, &challenge.challenge)
.await?; .await?;
let response = match self let response = self
.client .client
.request(to_request(challenge.respond_raw(&response))?) .request(to_request(challenge.respond_raw(&response))?)
.await .await?;
{
Ok(r) => {
retry_success(&mut retry, url_index);
r
}
Err(err) => {
if should_retry(&mut retry, &mut login, &mut url_index) {
continue;
}
return Err(err.into());
}
};
let status = response.status(); let status = response.status();
if !status.is_success() { if !status.is_success() {
@ -288,22 +177,24 @@ where
.response(response.body()) .response(response.body())
.map_err(E::Error::bad_api)?; .map_err(E::Error::bad_api)?;
break self.finish_auth(current_url, &userid, auth).await; self.finish_auth(&userid, auth).await
}
} }
/// Get the current username and, if required, a `Login` request. /// Get the current username and, if required, a `Login` request.
async fn need_login(&self, current_url: &Uri) -> Result<(String, Option<Login>), E::Error> { async fn need_login(&self) -> Result<(String, Option<Login>), E::Error> {
use proxmox_login::ticket::Validity; use proxmox_login::ticket::Validity;
let (userid, auth) = self.current_auth().await?; let (userid, auth) = self.current_auth().await?;
let authkind = match auth { let authkind = match auth {
None => { None => {
let password = self.env.query_password_async(current_url, &userid).await?; let password = self
.env
.query_password_async(&self.api_url, &userid)
.await?;
return Ok(( return Ok((
userid.clone(), userid.clone(),
Some(Login::new(current_url.to_string(), userid, password)), Some(Login::new(self.api_url.to_string(), userid, password)),
)); ));
} }
Some(authkind) => authkind, Some(authkind) => authkind,
@ -322,38 +213,36 @@ where
Validity::Refresh => ( Validity::Refresh => (
userid, userid,
Some( Some(
Login::renew(current_url.to_string(), auth.ticket.to_string()) Login::renew(self.api_url.to_string(), auth.ticket.to_string())
.map_err(E::Error::custom)?, .map_err(E::Error::custom)?,
), ),
), ),
Validity::Expired => { Validity::Expired => {
let password = self.env.query_password_async(current_url, &userid).await?; let password = self
.env
.query_password_async(&self.api_url, &userid)
.await?;
( (
userid.clone(), userid.clone(),
Some(Login::new(current_url.to_string(), userid, password)), Some(Login::new(self.api_url.to_string(), userid, password)),
) )
} }
}) })
} }
/// Store the authentication info in our `auth` field and notify the environment. /// Store the authentication info in our `auth` field and notify the environment.
async fn finish_auth( async fn finish_auth(&self, userid: &str, auth: Authentication) -> Result<(), E::Error> {
&self,
current_url: &Uri,
userid: &str,
auth: Authentication,
) -> Result<(), E::Error> {
let auth_string = serde_json::to_string(&auth).map_err(E::Error::internal)?; let auth_string = serde_json::to_string(&auth).map_err(E::Error::internal)?;
*self.auth.lock().unwrap() = Some(Arc::new(auth.into())); *self.auth.lock().unwrap() = Some(Arc::new(auth.into()));
self.env self.env
.store_ticket_async(current_url, userid, auth_string.as_bytes()) .store_ticket_async(&self.api_url, userid, auth_string.as_bytes())
.await .await
} }
/// Get the currently used API url from our array of possible cluster nodes. /// Get the currently used API url.
fn api_url(&self) -> &Uri { pub fn api_url(&self) -> &Uri {
&self.api_urls.urls[self.api_urls.index()] &self.api_url
} }
/// Get the current user id and a reference to the current authentication method. /// Get the current user id and a reference to the current authentication method.
@ -370,7 +259,7 @@ where
Some(auth) Some(auth)
} }
None => { None => {
userid = self.env.query_userid_async(self.api_url()).await?; userid = self.env.query_userid_async(&self.api_url).await?;
self.reload_existing_ticket(&userid).await? self.reload_existing_ticket(&userid).await?
} }
}; };
@ -383,7 +272,7 @@ where
&self, &self,
userid: &str, userid: &str,
) -> Result<Option<Arc<AuthenticationKind>>, E::Error> { ) -> Result<Option<Arc<AuthenticationKind>>, E::Error> {
let ticket = match self.env.load_ticket_async(self.api_url(), userid).await? { let ticket = match self.env.load_ticket_async(&self.api_url, userid).await? {
Some(auth) => auth, Some(auth) => auth,
None => return Ok(None), None => return Ok(None),
}; };
@ -397,8 +286,8 @@ where
} }
/// Build a URI relative to the current API endpoint. /// Build a URI relative to the current API endpoint.
fn build_uri(&self, base_uri: Uri, path: &str) -> Result<Uri, E::Error> { fn build_uri(&self, path: &str) -> Result<Uri, E::Error> {
let parts = base_uri.into_parts(); let parts = self.api_url.clone().into_parts();
let mut builder = http::uri::Builder::new(); let mut builder = http::uri::Builder::new();
if let Some(scheme) = parts.scheme { if let Some(scheme) = parts.scheme {
builder = builder.scheme(scheme); builder = builder.scheme(scheme);
@ -412,65 +301,6 @@ where
.map_err(E::Error::internal) .map_err(E::Error::internal)
} }
/// Attempt to execute a request, while automatically trying to reach different cluster nodes
/// if we fail to connect to the current node.
///
/// The `make_request` closure gets the base `Uri` and should use it to build a `Request`.
/// The `Request` is then attempted. If there's a connection issue, `make_request` will be
/// called again with another cluster node (if available).
/// Only if no node responds - or a legitimate HTTP error is produced - will the error be
/// returned.
async fn request_retry_loop<
#[cfg(not(target_arch = "wasm32"))]
Fut: Future<Output = Result<Request<Vec<u8>>, E::Error>> + Send,
#[cfg(target_arch = "wasm32")]
Fut: Future<Output = Result<Request<Vec<u8>>, E::Error>>,
>(
&self,
make_request: impl Fn(Uri) -> Fut,
) -> Result<Response<Vec<u8>>, E::Error> {
let generation = self.api_urls.generation();
let mut url_index = self.api_urls.index();
let mut retry = None;
loop {
let err = match self
.client
.request(make_request(self.api_urls.urls[url_index].clone()).await?)
.await
{
Ok(response) => {
if retry.is_some() && self.api_urls.generation() == generation {
self.api_urls.current.store(url_index, Ordering::Relaxed);
self.api_urls
.generation
.store(generation + 1, Ordering::Release);
}
return Ok(response);
}
Err(err) => err,
};
match retry {
Some(retry) => {
if retry == url_index {
return Err(err.into());
}
}
None => retry = Some(url_index),
}
// if another thread successfully found a working URL already, use that as our last
// attempt:
if self.api_urls.generation() != generation {
url_index = self.api_urls.index();
retry = Some(url_index);
continue;
}
url_index = (url_index + 1) % self.api_urls.urls.len();
}
}
/// Execute a `GET` request, possibly trying multiple cluster nodes. /// Execute a `GET` request, possibly trying multiple cluster nodes.
pub async fn get<'a, R>(&'a self, uri: &str) -> Result<ApiResponse<R>, E::Error> pub async fn get<'a, R>(&'a self, uri: &str) -> Result<ApiResponse<R>, E::Error>
where where
@ -478,16 +308,13 @@ where
{ {
self.login().await?; self.login().await?;
let response = self let request = self
.request_retry_loop(|base_uri| async { .set_auth_headers(Request::get(self.build_uri(uri)?))
self.set_auth_headers(Request::get(self.build_uri(base_uri, uri)?))
.await? .await?
.body(Vec::new()) .body(Vec::new())
.map_err(Error::internal) .map_err(Error::internal)?;
})
.await?;
Self::handle_response(response) Self::handle_response(self.client.request(request).await?)
} }
/// Execute a `GET` request with the given body, possibly trying multiple cluster nodes. /// Execute a `GET` request with the given body, possibly trying multiple cluster nodes.
@ -536,16 +363,13 @@ where
{ {
self.login().await?; self.login().await?;
let response = self let request = self
.request_retry_loop(|base_uri| async { .set_auth_headers(Request::delete(self.build_uri(uri)?))
self.set_auth_headers(Request::delete(self.build_uri(base_uri, uri)?))
.await? .await?
.body(Vec::new()) .body(Vec::new())
.map_err(Error::internal) .map_err(Error::internal)?;
})
.await?;
Self::handle_response(response) Self::handle_response(self.client.request(request).await?)
} }
/// Execute a `DELETE` request with the given body, possibly trying multiple cluster nodes. /// Execute a `DELETE` request with the given body, possibly trying multiple cluster nodes.
@ -607,18 +431,18 @@ where
body: Vec<u8>, body: Vec<u8>,
content_length: usize, content_length: usize,
) -> Result<Response<Vec<u8>>, E::Error> { ) -> Result<Response<Vec<u8>>, E::Error> {
self.request_retry_loop(|base_uri| async {
let request = Request::builder() let request = Request::builder()
.method(method.clone()) .method(method.clone())
.uri(self.build_uri(base_uri, uri)?) .uri(self.build_uri(uri)?)
.header(http::header::CONTENT_TYPE, "application/json") .header(http::header::CONTENT_TYPE, "application/json")
.header(http::header::CONTENT_LENGTH, content_length.to_string()); .header(http::header::CONTENT_LENGTH, content_length.to_string());
auth.set_auth_headers(request) let request = auth
.set_auth_headers(request)
.body(body.clone()) .body(body.clone())
.map_err(Error::internal) .map_err(Error::internal)?;
})
.await Ok(self.client.request(request).await?)
} }
/// Check the status code, deserialize the json/extjs `RawApiResponse` and check for error /// Check the status code, deserialize the json/extjs `RawApiResponse` and check for error