diff --git a/proxmox-client/src/client.rs b/proxmox-client/src/client.rs index 510bf169..2f365fee 100644 --- a/proxmox-client/src/client.rs +++ b/proxmox-client/src/client.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; -use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use std::sync::Mutex as StdMutex; @@ -29,44 +28,10 @@ pub trait HttpClient: Send + Sync { fn request(&self, request: Request>) -> Self::Request; } -/// In a cluster we may be able to connect to a different node if one connection fails. -struct ApiUrls { - /// This is the list of cluster node URls. - urls: Vec, - - /// This is the current "good" URL. If we fail to connect here, we'll walk around the `urls` - /// vec once before failing completely. - /// Once a "good" URL is reached, we update this. - current: AtomicUsize, - - /// Since another thread might be doing the same thing simultaneously, let's use this to keep - /// track of when some thread has updated `current`. If we see a `generation` bump while - /// probing URLs, we'll retry the new `current`. - generation: AtomicU32, -} - -impl ApiUrls { - fn new(uri: Uri) -> Self { - Self { - urls: vec![uri], - current: AtomicUsize::new(0), - generation: AtomicU32::new(0), - } - } - - fn index(&self) -> usize { - self.current.load(Ordering::Relaxed) - } - - fn generation(&self) -> u32 { - self.generation.load(Ordering::Acquire) - } -} - /// Proxmox VE high level API client. pub struct Client { env: E, - api_urls: ApiUrls, + api_url: Uri, auth: StdMutex>>, client: C, pve_compat: bool, @@ -127,7 +92,7 @@ where pub fn with_client(api_url: Uri, environment: E, client: C) -> Self { Self { env: environment, - api_urls: ApiUrls::new(api_url), + api_url, auth: StdMutex::new(None), client, pve_compat: false, @@ -173,137 +138,63 @@ where /// If no valid ticket is available already, this will connect to the PVE API and perform /// authentication. pub async fn login(&self) -> Result<(), E::Error> { - let mut url_index = self.api_urls.index(); - let current_url = &self.api_urls.urls[url_index]; + let (userid, login) = self.need_login().await?; + let Some(login) = login else { return Ok(()) }; - let (userid, login) = self.need_login(current_url).await?; - let login = match login { - None => return Ok(()), - Some(login) => login, - }; + let login = login.pve_compatibility(self.pve_compat); - let mut login = login.pve_compatibility(self.pve_compat); + let response = self.client.request(to_request(login.request())?).await?; - let mut retry = None; - let generation = self.api_urls.generation(); - - // remember the finally successful address - let retry_success = |retry: &mut Option, url_index: usize| { - if retry.is_some() { - *retry = None; - if self.api_urls.generation() == generation { - self.api_urls.current.store(url_index, Ordering::Relaxed); - self.api_urls - .generation - .store(generation + 1, Ordering::Release); - } - } - }; - - // check whether we should be retrying a new address - let should_retry = - |retry: &mut Option, login: &mut Login, url_index: &mut usize| -> bool { - match *retry { - Some(retry) => { - if retry == *url_index { - return false; - } - } - None => *retry = Some(*url_index), - } - - // if another thread successfully found a working URL already, use that as our last - // attempt: - if self.api_urls.generation() != generation { - *url_index = self.api_urls.index(); - *retry = Some(*url_index); - return true; - } - - // otherwise cycle through the available addresses: - *url_index = (*url_index + 1) % self.api_urls.urls.len(); - login.set_url(self.api_urls.urls[*url_index].to_string()); - - true - }; - - loop { - let current_url = &self.api_urls.urls[url_index]; - let response = match self.client.request(to_request(login.request())?).await { - Ok(r) => { - retry_success(&mut retry, url_index); - r - } - Err(err) => { - if should_retry(&mut retry, &mut login, &mut url_index) { - continue; - } - return Err(err.into()); - } - }; - - if !response.status().is_success() { - // FIXME: does `http` somehow expose the status string? - return Err(E::Error::api_error( - response.status(), - "authentication failed", - )); - } - - let challenge = match login.response(response.body()).map_err(E::Error::bad_api)? { - TicketResult::Full(auth) => { - return self.finish_auth(current_url, &userid, auth).await - } - TicketResult::TfaRequired(challenge) => challenge, - }; - - let response = self - .env - .query_second_factor_async(current_url, &userid, &challenge.challenge) - .await?; - - let response = match self - .client - .request(to_request(challenge.respond_raw(&response))?) - .await - { - Ok(r) => { - retry_success(&mut retry, url_index); - r - } - Err(err) => { - if should_retry(&mut retry, &mut login, &mut url_index) { - continue; - } - return Err(err.into()); - } - }; - - let status = response.status(); - if !status.is_success() { - return Err(E::Error::api_error(status, "authentication failed")); - } - - let auth = challenge - .response(response.body()) - .map_err(E::Error::bad_api)?; - - break self.finish_auth(current_url, &userid, auth).await; + if !response.status().is_success() { + // FIXME: does `http` somehow expose the status string? + return Err(E::Error::api_error( + response.status(), + "authentication failed", + )); } + + let challenge = match login.response(response.body()).map_err(E::Error::bad_api)? { + TicketResult::Full(auth) => return self.finish_auth(&userid, auth).await, + TicketResult::TfaRequired(challenge) => challenge, + }; + + let response = self + .env + .query_second_factor_async(&self.api_url, &userid, &challenge.challenge) + .await?; + + let response = self + .client + .request(to_request(challenge.respond_raw(&response))?) + .await?; + + let status = response.status(); + if !status.is_success() { + return Err(E::Error::api_error(status, "authentication failed")); + } + + let auth = challenge + .response(response.body()) + .map_err(E::Error::bad_api)?; + + self.finish_auth(&userid, auth).await } /// Get the current username and, if required, a `Login` request. - async fn need_login(&self, current_url: &Uri) -> Result<(String, Option), E::Error> { + async fn need_login(&self) -> Result<(String, Option), E::Error> { use proxmox_login::ticket::Validity; let (userid, auth) = self.current_auth().await?; let authkind = match auth { None => { - let password = self.env.query_password_async(current_url, &userid).await?; + let password = self + .env + .query_password_async(&self.api_url, &userid) + .await?; return Ok(( userid.clone(), - Some(Login::new(current_url.to_string(), userid, password)), + Some(Login::new(self.api_url.to_string(), userid, password)), )); } Some(authkind) => authkind, @@ -322,38 +213,36 @@ where Validity::Refresh => ( userid, Some( - Login::renew(current_url.to_string(), auth.ticket.to_string()) + Login::renew(self.api_url.to_string(), auth.ticket.to_string()) .map_err(E::Error::custom)?, ), ), Validity::Expired => { - let password = self.env.query_password_async(current_url, &userid).await?; + let password = self + .env + .query_password_async(&self.api_url, &userid) + .await?; ( userid.clone(), - Some(Login::new(current_url.to_string(), userid, password)), + Some(Login::new(self.api_url.to_string(), userid, password)), ) } }) } /// Store the authentication info in our `auth` field and notify the environment. - async fn finish_auth( - &self, - current_url: &Uri, - userid: &str, - auth: Authentication, - ) -> Result<(), E::Error> { + async fn finish_auth(&self, userid: &str, auth: Authentication) -> Result<(), E::Error> { let auth_string = serde_json::to_string(&auth).map_err(E::Error::internal)?; *self.auth.lock().unwrap() = Some(Arc::new(auth.into())); self.env - .store_ticket_async(current_url, userid, auth_string.as_bytes()) + .store_ticket_async(&self.api_url, userid, auth_string.as_bytes()) .await } - /// Get the currently used API url from our array of possible cluster nodes. - fn api_url(&self) -> &Uri { - &self.api_urls.urls[self.api_urls.index()] + /// Get the currently used API url. + pub fn api_url(&self) -> &Uri { + &self.api_url } /// Get the current user id and a reference to the current authentication method. @@ -370,7 +259,7 @@ where Some(auth) } None => { - userid = self.env.query_userid_async(self.api_url()).await?; + userid = self.env.query_userid_async(&self.api_url).await?; self.reload_existing_ticket(&userid).await? } }; @@ -383,7 +272,7 @@ where &self, userid: &str, ) -> Result>, E::Error> { - let ticket = match self.env.load_ticket_async(self.api_url(), userid).await? { + let ticket = match self.env.load_ticket_async(&self.api_url, userid).await? { Some(auth) => auth, None => return Ok(None), }; @@ -397,8 +286,8 @@ where } /// Build a URI relative to the current API endpoint. - fn build_uri(&self, base_uri: Uri, path: &str) -> Result { - let parts = base_uri.into_parts(); + fn build_uri(&self, path: &str) -> Result { + let parts = self.api_url.clone().into_parts(); let mut builder = http::uri::Builder::new(); if let Some(scheme) = parts.scheme { builder = builder.scheme(scheme); @@ -412,65 +301,6 @@ where .map_err(E::Error::internal) } - /// Attempt to execute a request, while automatically trying to reach different cluster nodes - /// if we fail to connect to the current node. - /// - /// The `make_request` closure gets the base `Uri` and should use it to build a `Request`. - /// The `Request` is then attempted. If there's a connection issue, `make_request` will be - /// called again with another cluster node (if available). - /// Only if no node responds - or a legitimate HTTP error is produced - will the error be - /// returned. - async fn request_retry_loop< - #[cfg(not(target_arch = "wasm32"))] - Fut: Future>, E::Error>> + Send, - #[cfg(target_arch = "wasm32")] - Fut: Future>, E::Error>>, - >( - &self, - make_request: impl Fn(Uri) -> Fut, - ) -> Result>, E::Error> { - let generation = self.api_urls.generation(); - let mut url_index = self.api_urls.index(); - let mut retry = None; - loop { - let err = match self - .client - .request(make_request(self.api_urls.urls[url_index].clone()).await?) - .await - { - Ok(response) => { - if retry.is_some() && self.api_urls.generation() == generation { - self.api_urls.current.store(url_index, Ordering::Relaxed); - self.api_urls - .generation - .store(generation + 1, Ordering::Release); - } - return Ok(response); - } - Err(err) => err, - }; - - match retry { - Some(retry) => { - if retry == url_index { - return Err(err.into()); - } - } - None => retry = Some(url_index), - } - - // if another thread successfully found a working URL already, use that as our last - // attempt: - if self.api_urls.generation() != generation { - url_index = self.api_urls.index(); - retry = Some(url_index); - continue; - } - - url_index = (url_index + 1) % self.api_urls.urls.len(); - } - } - /// Execute a `GET` request, possibly trying multiple cluster nodes. pub async fn get<'a, R>(&'a self, uri: &str) -> Result, E::Error> where @@ -478,16 +308,13 @@ where { self.login().await?; - let response = self - .request_retry_loop(|base_uri| async { - self.set_auth_headers(Request::get(self.build_uri(base_uri, uri)?)) - .await? - .body(Vec::new()) - .map_err(Error::internal) - }) - .await?; + let request = self + .set_auth_headers(Request::get(self.build_uri(uri)?)) + .await? + .body(Vec::new()) + .map_err(Error::internal)?; - Self::handle_response(response) + Self::handle_response(self.client.request(request).await?) } /// Execute a `GET` request with the given body, possibly trying multiple cluster nodes. @@ -536,16 +363,13 @@ where { self.login().await?; - let response = self - .request_retry_loop(|base_uri| async { - self.set_auth_headers(Request::delete(self.build_uri(base_uri, uri)?)) - .await? - .body(Vec::new()) - .map_err(Error::internal) - }) - .await?; + let request = self + .set_auth_headers(Request::delete(self.build_uri(uri)?)) + .await? + .body(Vec::new()) + .map_err(Error::internal)?; - Self::handle_response(response) + Self::handle_response(self.client.request(request).await?) } /// Execute a `DELETE` request with the given body, possibly trying multiple cluster nodes. @@ -607,18 +431,18 @@ where body: Vec, content_length: usize, ) -> Result>, E::Error> { - self.request_retry_loop(|base_uri| async { - let request = Request::builder() - .method(method.clone()) - .uri(self.build_uri(base_uri, uri)?) - .header(http::header::CONTENT_TYPE, "application/json") - .header(http::header::CONTENT_LENGTH, content_length.to_string()); + let request = Request::builder() + .method(method.clone()) + .uri(self.build_uri(uri)?) + .header(http::header::CONTENT_TYPE, "application/json") + .header(http::header::CONTENT_LENGTH, content_length.to_string()); - auth.set_auth_headers(request) - .body(body.clone()) - .map_err(Error::internal) - }) - .await + let request = auth + .set_auth_headers(request) + .body(body.clone()) + .map_err(Error::internal)?; + + Ok(self.client.request(request).await?) } /// Check the status code, deserialize the json/extjs `RawApiResponse` and check for error