pull: refactor pulling from a datastore

... making the pull logic independent from the actual source
using two traits.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Tested-by: Gabriel Goller <g.goller@proxmox.com>
This commit is contained in:
Hannes Laimer 2023-11-21 15:31:51 +01:00 committed by Thomas Lamprecht
parent 09683f1290
commit 05a52d0106
5 changed files with 561 additions and 401 deletions

View File

@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
# regular crates # regular crates
anyhow = "1.0" anyhow = "1.0"
async-trait = "0.1.56"
apt-pkg-native = "0.3.2" apt-pkg-native = "0.3.2"
base64 = "0.13" base64 = "0.13"
bitflags = "1.2.1" bitflags = "1.2.1"
@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
async-trait.workspace = true
apt-pkg-native.workspace = true apt-pkg-native.workspace = true
base64.workspace = true base64.workspace = true
bitflags.workspace = true bitflags.workspace = true

View File

@ -14,7 +14,7 @@ pub trait ReadChunk {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>; fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
} }
pub trait AsyncReadChunk: Send { pub trait AsyncReadChunk: Send + Sync {
/// Returns the encoded chunk data /// Returns the encoded chunk data
fn read_raw_chunk<'a>( fn read_raw_chunk<'a>(
&'a self, &'a self,

View File

@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
Ok(()) Ok(())
} }
/// Helper to get client for remote.cfg entry /// Helper to get client for remote.cfg entry without login, just config
pub async fn remote_client( pub fn remote_client_config(
remote: &Remote, remote: &Remote,
limit: Option<RateLimitConfig>, limit: Option<RateLimitConfig>,
) -> Result<HttpClient, Error> { ) -> Result<HttpClient, Error> {
@ -320,6 +320,16 @@ pub async fn remote_client(
&remote.config.auth_id, &remote.config.auth_id,
options, options,
)?; )?;
Ok(client)
}
/// Helper to get client for remote.cfg entry
pub async fn remote_client(
remote: &Remote,
limit: Option<RateLimitConfig>,
) -> Result<HttpClient, Error> {
let client = remote_client_config(remote, limit)?;
let _auth_info = client let _auth_info = client
.login() // make sure we can auth .login() // make sure we can auth
.await .await

View File

@ -65,7 +65,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new( PullParameters::new(
&sync_job.store, &sync_job.store,
sync_job.ns.clone().unwrap_or_default(), sync_job.ns.clone().unwrap_or_default(),
&sync_job.remote, Some(&sync_job.remote),
&sync_job.remote_store, &sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(), sync_job.remote_ns.clone().unwrap_or_default(),
sync_job sync_job
@ -114,7 +114,6 @@ pub fn do_sync_job(
let worker_future = async move { let worker_future = async move {
let pull_params = PullParameters::try_from(&sync_job)?; let pull_params = PullParameters::try_from(&sync_job)?;
let client = pull_params.client().await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id); task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
@ -128,7 +127,7 @@ pub fn do_sync_job(
sync_job.remote_store, sync_job.remote_store,
); );
pull_store(&worker, &client, pull_params).await?; pull_store(&worker, pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id); task_log!(worker, "sync job '{}' end", &job_id);
@ -256,7 +255,7 @@ async fn pull(
let pull_params = PullParameters::new( let pull_params = PullParameters::new(
&store, &store,
ns, ns,
&remote, Some(&remote),
&remote_store, &remote_store,
remote_ns.unwrap_or_default(), remote_ns.unwrap_or_default(),
auth_id.clone(), auth_id.clone(),
@ -266,7 +265,6 @@ async fn pull(
limit, limit,
transfer_last, transfer_last,
)?; )?;
let client = pull_params.client().await?;
// fixme: set to_stdout to false? // fixme: set to_stdout to false?
// FIXME: add namespace to worker id? // FIXME: add namespace to worker id?
@ -284,7 +282,7 @@ async fn pull(
remote_store, remote_store,
); );
let pull_future = pull_store(&worker, &client, pull_params); let pull_future = pull_store(&worker, pull_params);
(select! { (select! {
success = pull_future.fuse() => success, success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,

File diff suppressed because it is too large Load Diff