mirror of
https://git.proxmox.com/git/proxmox-backup
synced 2025-08-14 17:13:17 +00:00
metric collection: put metrics in a cache
Any pull-metric API endpoint can alter access the cache to retrieve metric data for a limited time (30mins). Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
parent
20753e1b53
commit
da12adb1f9
@ -72,16 +72,27 @@ async fn run_stat_generator() {
|
|||||||
rrd::sync_journal();
|
rrd::sync_journal();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
let pull_metric_future = tokio::task::spawn_blocking({
|
||||||
|
let stats = Arc::clone(&stats);
|
||||||
|
move || {
|
||||||
|
pull_metrics::update_metrics(&stats.0, &stats.1, &stats.2)?;
|
||||||
|
Ok::<(), Error>(())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let metrics_future = metric_server::send_data_to_metric_servers(stats);
|
let metrics_future = metric_server::send_data_to_metric_servers(stats);
|
||||||
|
|
||||||
let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
|
let (rrd_res, metrics_res, pull_metrics_res) =
|
||||||
|
join!(rrd_future, metrics_future, pull_metric_future);
|
||||||
if let Err(err) = rrd_res {
|
if let Err(err) = rrd_res {
|
||||||
log::error!("rrd update panicked: {err}");
|
log::error!("rrd update panicked: {err}");
|
||||||
}
|
}
|
||||||
if let Err(err) = metrics_res {
|
if let Err(err) = metrics_res {
|
||||||
log::error!("error during metrics sending: {err}");
|
log::error!("error during metrics sending: {err}");
|
||||||
}
|
}
|
||||||
|
if let Err(err) = pull_metrics_res {
|
||||||
|
log::error!("error caching pull-style metrics: {err}");
|
||||||
|
}
|
||||||
|
|
||||||
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
|
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
|
||||||
}
|
}
|
||||||
|
@ -3,11 +3,16 @@ use std::{path::Path, sync::OnceLock, time::Duration};
|
|||||||
use anyhow::{format_err, Error};
|
use anyhow::{format_err, Error};
|
||||||
|
|
||||||
use nix::sys::stat::Mode;
|
use nix::sys::stat::Mode;
|
||||||
|
use pbs_api_types::{
|
||||||
|
MetricDataPoint,
|
||||||
|
MetricDataType::{self, Derive, Gauge},
|
||||||
|
};
|
||||||
use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR;
|
use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR;
|
||||||
use proxmox_shared_cache::SharedCache;
|
use proxmox_shared_cache::SharedCache;
|
||||||
use proxmox_sys::fs::CreateOptions;
|
use proxmox_sys::fs::CreateOptions;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use super::METRIC_COLLECTION_INTERVAL;
|
use super::{DiskStat, HostStats, METRIC_COLLECTION_INTERVAL};
|
||||||
|
|
||||||
const METRIC_CACHE_TIME: Duration = Duration::from_secs(30 * 60);
|
const METRIC_CACHE_TIME: Duration = Duration::from_secs(30 * 60);
|
||||||
const STORED_METRIC_GENERATIONS: u64 =
|
const STORED_METRIC_GENERATIONS: u64 =
|
||||||
@ -33,3 +38,103 @@ pub(super) fn init() -> Result<(), Error> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert `DiskStat` `HostStat` into a universal metric data point and cache
|
||||||
|
/// them for a later retrieval.
|
||||||
|
pub(super) fn update_metrics(
|
||||||
|
host: &HostStats,
|
||||||
|
hostdisk: &DiskStat,
|
||||||
|
datastores: &[DiskStat],
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut points = MetricDataPoints::new(proxmox_time::epoch_i64());
|
||||||
|
|
||||||
|
// Using the same metric names as in PVE's new /cluster/metrics/export endpoint
|
||||||
|
if let Some(stat) = &host.proc {
|
||||||
|
points.add(Gauge, "host", "cpu_current", stat.cpu);
|
||||||
|
points.add(Gauge, "host", "cpu_iowait", stat.iowait_percent);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(loadavg) = &host.load {
|
||||||
|
points.add(Gauge, "host", "cpu_avg1", loadavg.0);
|
||||||
|
points.add(Gauge, "host", "cpu_avg5", loadavg.1);
|
||||||
|
points.add(Gauge, "host", "cpu_avg15", loadavg.2);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(meminfo) = &host.meminfo {
|
||||||
|
points.add(Gauge, "host", "mem_total", meminfo.memtotal as f64);
|
||||||
|
points.add(Gauge, "host", "mem_used", meminfo.memused as f64);
|
||||||
|
points.add(Gauge, "host", "swap_total", meminfo.swaptotal as f64);
|
||||||
|
points.add(Gauge, "host", "swap_used", meminfo.swapused as f64);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(netdev) = &host.net {
|
||||||
|
use pbs_config::network::is_physical_nic;
|
||||||
|
let mut netin = 0;
|
||||||
|
let mut netout = 0;
|
||||||
|
for item in netdev {
|
||||||
|
if !is_physical_nic(&item.device) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
netin += item.receive;
|
||||||
|
netout += item.send;
|
||||||
|
}
|
||||||
|
points.add(Derive, "host", "net_in", netin as f64);
|
||||||
|
points.add(Derive, "host", "net_out", netout as f64);
|
||||||
|
}
|
||||||
|
|
||||||
|
update_disk_metrics(&mut points, hostdisk, "host");
|
||||||
|
|
||||||
|
for stat in datastores {
|
||||||
|
let id = format!("datastore/{}", stat.name);
|
||||||
|
update_disk_metrics(&mut points, stat, &id);
|
||||||
|
}
|
||||||
|
|
||||||
|
get_cache()?.set(&points, Duration::from_secs(2))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_cache() -> Result<&'static SharedCache, Error> {
|
||||||
|
// Not using get_or_init here since initialization can fail.
|
||||||
|
METRIC_CACHE
|
||||||
|
.get()
|
||||||
|
.ok_or_else(|| format_err!("metric cache not initialized"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_disk_metrics(points: &mut MetricDataPoints, disk: &DiskStat, id: &str) {
|
||||||
|
if let Some(status) = &disk.usage {
|
||||||
|
points.add(Gauge, id, "disk_total", status.total as f64);
|
||||||
|
points.add(Gauge, id, "disk_used", status.used as f64);
|
||||||
|
points.add(Gauge, id, "disk_available", status.available as f64);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(stat) = &disk.dev {
|
||||||
|
points.add(Derive, id, "disk_read", (stat.read_sectors * 512) as f64);
|
||||||
|
points.add(Derive, id, "disk_write", (stat.write_sectors * 512) as f64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct MetricDataPoints {
|
||||||
|
timestamp: i64,
|
||||||
|
datapoints: Vec<MetricDataPoint>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricDataPoints {
|
||||||
|
fn new(timestamp: i64) -> Self {
|
||||||
|
Self {
|
||||||
|
datapoints: Vec::new(),
|
||||||
|
timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, ty: MetricDataType, id: &str, metric: &str, value: f64) {
|
||||||
|
self.datapoints.push(MetricDataPoint {
|
||||||
|
id: id.into(),
|
||||||
|
metric: metric.into(),
|
||||||
|
timestamp: self.timestamp,
|
||||||
|
ty,
|
||||||
|
value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user