diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 2e2acfc9..947062ae 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,76 +1,69 @@ -use std::sync::{Mutex, Arc}; -use std::path::{Path, PathBuf}; -use std::os::unix::io::AsRawFd; use std::future::Future; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::*; use http::request::Parts; use http::Response; -use hyper::{Body, StatusCode}; use hyper::header; +use hyper::{Body, StatusCode}; use url::form_urlencoded; -use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; -use tokio_stream::wrappers::ReceiverStream; +use http::{HeaderMap, Method}; +use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use serde_json::{json, Value}; -use http::{Method, HeaderMap}; +use tokio_stream::wrappers::ReceiverStream; -use proxmox_sys::linux::socket::set_tcp_keepalive; -use proxmox_sys::fs::CreateOptions; +use proxmox_http::client::{RateLimitedStream, ShareableRateLimit}; use proxmox_lang::try_block; use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation}; -use proxmox_http::client::{RateLimitedStream, ShareableRateLimit}; -use proxmox_sys::{task_log, task_warn}; +use proxmox_sys::fs::CreateOptions; +use proxmox_sys::linux::socket::set_tcp_keepalive; use proxmox_sys::logrotate::LogRotate; +use proxmox_sys::{task_log, task_warn}; use pbs_datastore::DataStore; use proxmox_rest_server::{ - rotate_task_log_archive, extract_cookie , AuthError, ApiConfig, RestServer, RestEnvironment, - ServerAdapter, WorkerTask, cleanup_old_tasks, + cleanup_old_tasks, extract_cookie, rotate_task_log_archive, ApiConfig, AuthError, + RestEnvironment, RestServer, ServerAdapter, WorkerTask, }; use proxmox_backup::rrd_cache::{ - initialize_rrd_cache, rrd_update_gauge, rrd_update_derive, rrd_sync_journal, + initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge, }; use proxmox_backup::{ - TRAFFIC_CONTROL_CACHE, server::{ auth::check_pbs_auth, - jobstate::{ - self, - Job, - }, + jobstate::{self, Job}, }, + TRAFFIC_CONTROL_CACHE, }; use pbs_buildcfg::configdir; use proxmox_time::CalendarEvent; use pbs_api_types::{ - Authid, TapeBackupJobConfig, VerificationJobConfig, SyncJobConfig, DataStoreConfig, - PruneOptions, + Authid, DataStoreConfig, PruneOptions, SyncJobConfig, TapeBackupJobConfig, + VerificationJobConfig, }; use proxmox_rest_server::daemon; -use proxmox_backup::server; use proxmox_backup::auth_helpers::*; +use proxmox_backup::server; use proxmox_backup::tools::{ + disks::{zfs_dataset_stats, DiskManage}, PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, - disks::{ - DiskManage, - zfs_dataset_stats, - }, }; - use proxmox_backup::api2::pull::do_sync_job; use proxmox_backup::api2::tape::backup::do_tape_backup_job; -use proxmox_backup::server::do_verification_job; use proxmox_backup::server::do_prune_job; +use proxmox_backup::server::do_verification_job; fn main() -> Result<(), Error> { pbs_tools::setup_libc_malloc_opts(); @@ -83,17 +76,19 @@ fn main() -> Result<(), Error> { let running_gid = nix::unistd::Gid::effective(); if running_uid != backup_uid || running_gid != backup_gid { - bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid); + bail!( + "proxy not running as backup user or group (got uid {} gid {})", + running_uid, + running_gid + ); } proxmox_async::runtime::main(run()) } - struct ProxmoxBackupProxyAdapter; impl ServerAdapter for ProxmoxBackupProxyAdapter { - fn get_index( &self, env: RestEnvironment, @@ -120,16 +115,12 @@ fn extract_lang_header(headers: &http::HeaderMap) -> Option { None } -async fn get_index_future( - env: RestEnvironment, - parts: Parts, -) -> Response { - +async fn get_index_future(env: RestEnvironment, parts: Parts) -> Response { let auth_id = env.get_auth_id(); let api = env.api_config(); let language = extract_lang_header(&parts.headers); - // fixme: make all IO async + // fixme: make all IO async let (userid, csrf_token) = match auth_id { Some(auth_id) => { @@ -140,7 +131,7 @@ async fn get_index_future( let new_csrf_token = assemble_csrf_prevention_token(csrf_secret(), &userid); (Some(userid), Some(new_csrf_token)) } - _ => (None, None) + _ => (None, None), } } None => (None, None), @@ -210,7 +201,8 @@ async fn run() -> Result<(), Error> { if let Err(err) = syslog::init( syslog::Facility::LOG_DAEMON, log::LevelFilter::Info, - Some("proxmox-backup-proxy")) { + Some("proxmox-backup-proxy"), + ) { bail!("unable to inititialize syslog - {}", err); } @@ -237,7 +229,10 @@ async fn run() -> Result<(), Error> { config.add_alias("fontawesome", "/usr/share/fonts-font-awesome"); config.add_alias("xtermjs", "/usr/share/pve-xtermjs"); config.add_alias("locale", "/usr/share/pbs-i18n"); - config.add_alias("widgettoolkit", "/usr/share/javascript/proxmox-widget-toolkit"); + config.add_alias( + "widgettoolkit", + "/usr/share/javascript/proxmox-widget-toolkit", + ); config.add_alias("docs", "/usr/share/doc/proxmox-backup/html"); let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR); @@ -246,10 +241,17 @@ async fn run() -> Result<(), Error> { config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?; let backup_user = pbs_config::backup_user()?; - let mut commando_sock = proxmox_rest_server::CommandSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid); + let mut commando_sock = proxmox_rest_server::CommandSocket::new( + proxmox_rest_server::our_ctrl_sock(), + backup_user.gid, + ); - let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid); - let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid); + let dir_opts = CreateOptions::new() + .owner(backup_user.uid) + .group(backup_user.gid); + let file_opts = CreateOptions::new() + .owner(backup_user.uid) + .group(backup_user.gid); config.enable_access_log( pbs_buildcfg::API_ACCESS_LOG_FN, @@ -266,7 +268,10 @@ async fn run() -> Result<(), Error> { )?; let rest_server = RestServer::new(config); - proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?; + proxmox_rest_server::init_worker_tasks( + pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), + file_opts.clone(), + )?; //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes @@ -275,53 +280,43 @@ async fn run() -> Result<(), Error> { let acceptor = Arc::new(Mutex::new(acceptor)); // to renew the acceptor we just add a command-socket handler - commando_sock.register_command( - "reload-certificate".to_string(), - { - let acceptor = Arc::clone(&acceptor); - move |_value| -> Result<_, Error> { - log::info!("reloading certificate"); - match make_tls_acceptor() { - Err(err) => log::error!("error reloading certificate: {}", err), - Ok(new_acceptor) => { - let mut guard = acceptor.lock().unwrap(); - *guard = new_acceptor; - } + commando_sock.register_command("reload-certificate".to_string(), { + let acceptor = Arc::clone(&acceptor); + move |_value| -> Result<_, Error> { + log::info!("reloading certificate"); + match make_tls_acceptor() { + Err(err) => log::error!("error reloading certificate: {}", err), + Ok(new_acceptor) => { + let mut guard = acceptor.lock().unwrap(); + *guard = new_acceptor; } - Ok(Value::Null) - } - }, - )?; - - // to remove references for not configured datastores - commando_sock.register_command( - "datastore-removed".to_string(), - |_value| { - if let Err(err) = DataStore::remove_unused_datastores() { - log::error!("could not refresh datastores: {}", err); } Ok(Value::Null) } - )?; + })?; - let server = daemon::create_daemon( - ([0,0,0,0,0,0,0,0], 8007).into(), - move |listener| { + // to remove references for not configured datastores + commando_sock.register_command("datastore-removed".to_string(), |_value| { + if let Err(err) = DataStore::remove_unused_datastores() { + log::error!("could not refresh datastores: {}", err); + } + Ok(Value::Null) + })?; - let connections = accept_connections(listener, acceptor, debug); - let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); + let server = daemon::create_daemon(([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(), move |listener| { + let connections = accept_connections(listener, acceptor, debug); + let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); - Ok(async { - daemon::systemd_notify(daemon::SystemdNotify::Ready)?; + Ok(async { + daemon::systemd_notify(daemon::SystemdNotify::Ready)?; - hyper::Server::builder(connections) - .serve(rest_server) - .with_graceful_shutdown(proxmox_rest_server::shutdown_future()) - .map_err(Error::from) - .await - }) - }, - ); + hyper::Server::builder(connections) + .serve(rest_server) + .with_graceful_shutdown(proxmox_rest_server::shutdown_future()) + .map_err(Error::from) + .await + }) + }); proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?; @@ -364,9 +359,11 @@ fn make_tls_acceptor() -> Result { if let Some(ciphers) = ciphers_tls_1_2.as_deref() { acceptor.set_cipher_list(ciphers)?; } - acceptor.set_private_key_file(key_path, SslFiletype::PEM) + acceptor + .set_private_key_file(key_path, SslFiletype::PEM) .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; - acceptor.set_certificate_chain_file(cert_path) + acceptor + .set_certificate_chain_file(cert_path) .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; acceptor.set_options(openssl::ssl::SslOptions::NO_RENEGOTIATION); acceptor.check_private_key().unwrap(); @@ -374,8 +371,10 @@ fn make_tls_acceptor() -> Result { Ok(acceptor.build()) } -type ClientStreamResult = - Result>>>, Error>; +type ClientStreamResult = Result< + std::pin::Pin>>>, + Error, +>; const MAX_PENDING_ACCEPTS: usize = 1024; fn accept_connections( @@ -383,7 +382,6 @@ fn accept_connections( acceptor: Arc>, debug: bool, ) -> tokio::sync::mpsc::Receiver { - let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS); tokio::spawn(accept_connection(listener, acceptor, debug, sender)); @@ -402,7 +400,7 @@ async fn accept_connection( loop { let (sock, peer) = match listener.accept().await { Ok(conn) => conn, - Err(err) => { + Err(err) => { eprintln!("error accepting tcp connection: {}", err); continue; } @@ -411,27 +409,35 @@ async fn accept_connection( sock.set_nodelay(true).unwrap(); let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); - let sock = RateLimitedStream::with_limiter_update_cb(sock, move || lookup_rate_limiter(peer)); + let sock = + RateLimitedStream::with_limiter_update_cb(sock, move || lookup_rate_limiter(peer)); - let ssl = { // limit acceptor_guard scope + let ssl = { + // limit acceptor_guard scope // Acceptor can be reloaded using the command socket "reload-certificate" command let acceptor_guard = acceptor.lock().unwrap(); match openssl::ssl::Ssl::new(acceptor_guard.context()) { Ok(ssl) => ssl, Err(err) => { - eprintln!("failed to create Ssl object from Acceptor context - {}", err); + eprintln!( + "failed to create Ssl object from Acceptor context - {}", + err + ); continue; - }, + } } }; let stream = match tokio_openssl::SslStream::new(ssl, sock) { Ok(stream) => stream, Err(err) => { - eprintln!("failed to create SslStream using ssl and connection socket - {}", err); + eprintln!( + "failed to create SslStream using ssl and connection socket - {}", + err + ); continue; - }, + } }; let mut stream = Box::pin(stream); @@ -444,8 +450,8 @@ async fn accept_connection( let accept_counter = Arc::clone(&accept_counter); tokio::spawn(async move { - let accept_future = tokio::time::timeout( - Duration::new(10, 0), stream.as_mut().accept()); + let accept_future = + tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept()); let result = accept_future.await; @@ -493,43 +499,45 @@ fn start_traffic_control_updater() { tokio::spawn(task.map(|_| ())); } -use std::time::{SystemTime, Instant, Duration, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; fn next_minute() -> Result { let now = SystemTime::now(); let epoch_now = now.duration_since(UNIX_EPOCH)?; - let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60); + let epoch_next = Duration::from_secs((epoch_now.as_secs() / 60 + 1) * 60); Ok(Instant::now() + epoch_next - epoch_now) } async fn run_task_scheduler() { - let mut count: usize = 0; loop { count += 1; - let delay_target = match next_minute() { // try to run very minute + let delay_target = match next_minute() { + // try to run very minute Ok(d) => d, Err(err) => { eprintln!("task scheduler: compute next minute failed - {}", err); - tokio::time::sleep_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await; + tokio::time::sleep_until(tokio::time::Instant::from_std( + Instant::now() + Duration::from_secs(60), + )) + .await; continue; } }; - if count > 2 { // wait 1..2 minutes before starting + if count > 2 { + // wait 1..2 minutes before starting match schedule_tasks().catch_unwind().await { - Err(panic) => { - match panic.downcast::<&str>() { - Ok(msg) => { - eprintln!("task scheduler panic: {}", msg); - } - Err(_) => { - eprintln!("task scheduler panic - unknown type"); - } + Err(panic) => match panic.downcast::<&str>() { + Ok(msg) => { + eprintln!("task scheduler panic: {}", msg); } - } + Err(_) => { + eprintln!("task scheduler panic - unknown type"); + } + }, Ok(Err(err)) => { eprintln!("task scheduler failed - {:?}", err); } @@ -542,7 +550,6 @@ async fn run_task_scheduler() { } async fn schedule_tasks() -> Result<(), Error> { - schedule_datastore_garbage_collection().await; schedule_datastore_prune().await; schedule_datastore_sync_jobs().await; @@ -554,7 +561,6 @@ async fn schedule_tasks() -> Result<(), Error> { } async fn schedule_datastore_garbage_collection() { - let config = match pbs_config::datastore::config() { Err(err) => { eprintln!("unable to read datastore config - {}", err); @@ -593,14 +599,19 @@ async fn schedule_datastore_garbage_collection() { } }; - if datastore.garbage_collection_running() { continue; } + if datastore.garbage_collection_running() { + continue; + } let worker_type = "garbage_collection"; let last = match jobstate::last_run_time(worker_type, &store) { Ok(time) => time, Err(err) => { - eprintln!("could not get last run time of {} {}: {}", worker_type, store, err); + eprintln!( + "could not get last run time of {} {}: {}", + worker_type, store, err + ); continue; } }; @@ -616,7 +627,9 @@ async fn schedule_datastore_garbage_collection() { let now = proxmox_time::epoch_i64(); - if next > now { continue; } + if next > now { + continue; + } let job = match Job::new(worker_type, &store) { Ok(job) => job, @@ -625,14 +638,22 @@ async fn schedule_datastore_garbage_collection() { let auth_id = Authid::root_auth_id(); - if let Err(err) = crate::server::do_garbage_collection_job(job, datastore, auth_id, Some(event_str), false) { - eprintln!("unable to start garbage collection job on datastore {} - {}", store, err); + if let Err(err) = crate::server::do_garbage_collection_job( + job, + datastore, + auth_id, + Some(event_str), + false, + ) { + eprintln!( + "unable to start garbage collection job on datastore {} - {}", + store, err + ); } } } async fn schedule_datastore_prune() { - let config = match pbs_config::datastore::config() { Err(err) => { eprintln!("unable to read datastore config - {}", err); @@ -642,7 +663,6 @@ async fn schedule_datastore_prune() { }; for (store, (_, store_config)) in config.sections { - let store_config: DataStoreConfig = match serde_json::from_value(store_config) { Ok(c) => c, Err(err) => { @@ -659,13 +679,14 @@ async fn schedule_datastore_prune() { let prune_options = PruneOptions { keep_last: store_config.keep_last, keep_hourly: store_config.keep_hourly, - keep_daily: store_config.keep_daily, + keep_daily: store_config.keep_daily, keep_weekly: store_config.keep_weekly, keep_monthly: store_config.keep_monthly, keep_yearly: store_config.keep_yearly, }; - if !pbs_datastore::prune::keeps_something(&prune_options) { // no prune settings - keep all + if !pbs_datastore::prune::keeps_something(&prune_options) { + // no prune settings - keep all continue; } @@ -677,7 +698,9 @@ async fn schedule_datastore_prune() { }; let auth_id = Authid::root_auth_id().clone(); - if let Err(err) = do_prune_job(job, prune_options, store.clone(), &auth_id, Some(event_str)) { + if let Err(err) = + do_prune_job(job, prune_options, store.clone(), &auth_id, Some(event_str)) + { eprintln!("unable to start datastore prune job {} - {}", &store, err); } }; @@ -685,8 +708,6 @@ async fn schedule_datastore_prune() { } async fn schedule_datastore_sync_jobs() { - - let config = match pbs_config::sync::config() { Err(err) => { eprintln!("unable to read sync job config - {}", err); @@ -725,7 +746,6 @@ async fn schedule_datastore_sync_jobs() { } async fn schedule_datastore_verify_jobs() { - let config = match pbs_config::verify::config() { Err(err) => { eprintln!("unable to read verification job config - {}", err); @@ -753,15 +773,18 @@ async fn schedule_datastore_verify_jobs() { Ok(job) => job, Err(_) => continue, // could not get lock }; - if let Err(err) = do_verification_job(job, job_config, &auth_id, Some(event_str), false) { - eprintln!("unable to start datastore verification job {} - {}", &job_id, err); + if let Err(err) = do_verification_job(job, job_config, &auth_id, Some(event_str), false) + { + eprintln!( + "unable to start datastore verification job {} - {}", + &job_id, err + ); } }; } } async fn schedule_tape_backup_jobs() { - let config = match pbs_config::tape_job::config() { Err(err) => { eprintln!("unable to read tape job config - {}", err); @@ -789,16 +812,16 @@ async fn schedule_tape_backup_jobs() { Ok(job) => job, Err(_) => continue, // could not get lock }; - if let Err(err) = do_tape_backup_job(job, job_config.setup, &auth_id, Some(event_str), false) { + if let Err(err) = + do_tape_backup_job(job, job_config.setup, &auth_id, Some(event_str), false) + { eprintln!("unable to start tape backup job {} - {}", &job_id, err); } }; } } - async fn schedule_task_log_rotate() { - let worker_type = "logrotate"; let job_id = "access-log_and_task-archive"; @@ -809,7 +832,7 @@ async fn schedule_task_log_rotate() { // if we never ran the rotation, schedule instantly match jobstate::JobState::load(worker_type, job_id) { Ok(state) => match state { - jobstate::JobState::Created { .. } => {}, + jobstate::JobState::Created { .. } => {} _ => return, }, _ => return, @@ -855,7 +878,6 @@ async fn schedule_task_log_rotate() { let max_size = 32 * 1024 * 1024 - 1; let max_files = 14; - let mut logrotate = LogRotate::new( pbs_buildcfg::API_ACCESS_LOG_FN, true, @@ -907,21 +929,26 @@ async fn schedule_task_log_rotate() { ) { eprintln!("unable to start task log rotation: {}", err); } - } async fn command_reopen_access_logfiles() -> Result<(), Error> { // only care about the most recent daemon instance for each, proxy & api, as other older ones // should not respond to new requests anyway, but only finish their current one and then exit. let sock = proxmox_rest_server::our_ctrl_sock(); - let f1 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); + let f1 = + proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?; let sock = proxmox_rest_server::ctrl_sock_from_pid(pid); - let f2 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); + let f2 = + proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); match futures::join!(f1, f2) { - (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)), + (Err(e1), Err(e2)) => Err(format_err!( + "reopen commands failed, proxy: {}; api: {}", + e1, + e2 + )), (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {}", e1)), (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {}", e2)), _ => Ok(()), @@ -939,7 +966,11 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> { let f2 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n"); match futures::join!(f1, f2) { - (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)), + (Err(e1), Err(e2)) => Err(format_err!( + "reopen commands failed, proxy: {}; api: {}", + e1, + e2 + )), (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {}", e1)), (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {}", e2)), _ => Ok(()), @@ -947,18 +978,15 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> { } async fn run_stat_generator() { - loop { - let delay_target = Instant::now() + Duration::from_secs(10); + let delay_target = Instant::now() + Duration::from_secs(10); generate_host_stats().await; rrd_sync_journal(); tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; - - } - + } } async fn generate_host_stats() { @@ -970,7 +998,8 @@ async fn generate_host_stats() { fn generate_host_stats_sync() { use proxmox_sys::linux::procfs::{ - read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg}; + read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat, + }; match read_proc_stat() { Ok(stat) => { @@ -1000,7 +1029,9 @@ fn generate_host_stats_sync() { let mut netin = 0; let mut netout = 0; for item in netdev { - if !is_physical_nic(&item.device) { continue; } + if !is_physical_nic(&item.device) { + continue; + } netin += item.receive; netout += item.send; } @@ -1027,11 +1058,11 @@ fn generate_host_stats_sync() { match pbs_config::datastore::config() { Ok((config, _)) => { - let datastore_list: Vec = - config.convert_to_typed_array("datastore").unwrap_or_default(); + let datastore_list: Vec = config + .convert_to_typed_array("datastore") + .unwrap_or_default(); for config in datastore_list { - let rrd_prefix = format!("datastore/{}", config.name); let path = std::path::Path::new(&config.path); gather_disk_stats(disk_manager.clone(), path, &rrd_prefix); @@ -1055,7 +1086,10 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool { let last = match jobstate::last_run_time(worker_type, id) { Ok(time) => time, Err(err) => { - eprintln!("could not get last run time of {} {}: {}", worker_type, id, err); + eprintln!( + "could not get last run time of {} {}: {}", + worker_type, id, err + ); return false; } }; @@ -1074,7 +1108,6 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool { } fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &str) { - match proxmox_backup::tools::disks::disk_usage(path) { Ok(status) => { let rrd_key = format!("{}/total", rrd_prefix); @@ -1088,7 +1121,7 @@ fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &st } match disk_manager.find_mounted_device(path) { - Ok(None) => {}, + Ok(None) => {} Ok(Some((fs_type, device, source))) => { let mut device_stat = None; match (fs_type.as_str(), source) { @@ -1114,15 +1147,15 @@ fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &st let rrd_key = format!("{}/read_ios", rrd_prefix); rrd_update_derive(&rrd_key, stat.read_ios as f64); let rrd_key = format!("{}/read_bytes", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64); + rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64); let rrd_key = format!("{}/write_ios", rrd_prefix); rrd_update_derive(&rrd_key, stat.write_ios as f64); let rrd_key = format!("{}/write_bytes", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64); + rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64); let rrd_key = format!("{}/io_ticks", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0); + rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0); } } Err(err) => { @@ -1137,23 +1170,24 @@ fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &st // proxmox-backup-client restore vm/201/2021-10-22T09:55:56Z drive-scsi0.img img1.img --repository localhost:store2 async fn run_traffic_control_updater() { - - loop { - let delay_target = Instant::now() + Duration::from_secs(1); + loop { + let delay_target = Instant::now() + Duration::from_secs(1); { let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap(); cache.compute_current_rates(); } - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; - } - + tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; + } } fn lookup_rate_limiter( peer: std::net::SocketAddr, -) -> (Option>, Option>) { +) -> ( + Option>, + Option>, +) { let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap(); let now = proxmox_time::epoch_i64();