metric_collection: split out push metric part

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
Lukas Wagner 2024-10-15 10:46:27 +02:00 committed by Wolfgang Bumiller
parent 045fc7750c
commit 0a852e1927
2 changed files with 138 additions and 130 deletions

View File

@ -0,0 +1,136 @@
use std::sync::Arc;
use anyhow::Error;
use serde_json::{json, Value};
use proxmox_metrics::MetricsData;
use super::{DiskStat, HostStats};
pub async fn send_data_to_metric_servers(
stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
) -> Result<(), Error> {
let (config, _digest) = pbs_config::metrics::config()?;
let channel_list = get_metric_server_connections(config)?;
if channel_list.is_empty() {
return Ok(());
}
let ctime = proxmox_time::epoch_i64();
let nodename = proxmox_sys::nodename();
let mut values = Vec::new();
let mut cpuvalue = match &stats.0.proc {
Some(stat) => serde_json::to_value(stat)?,
None => json!({}),
};
if let Some(loadavg) = &stats.0.load {
cpuvalue["avg1"] = Value::from(loadavg.0);
cpuvalue["avg5"] = Value::from(loadavg.1);
cpuvalue["avg15"] = Value::from(loadavg.2);
}
values.push(Arc::new(
MetricsData::new("cpustat", ctime, cpuvalue)?
.tag("object", "host")
.tag("host", nodename),
));
if let Some(stat) = &stats.0.meminfo {
values.push(Arc::new(
MetricsData::new("memory", ctime, stat)?
.tag("object", "host")
.tag("host", nodename),
));
}
if let Some(netdev) = &stats.0.net {
for item in netdev {
values.push(Arc::new(
MetricsData::new("nics", ctime, item)?
.tag("object", "host")
.tag("host", nodename)
.tag("instance", item.device.clone()),
));
}
}
values.push(Arc::new(
MetricsData::new("blockstat", ctime, stats.1.to_value())?
.tag("object", "host")
.tag("host", nodename),
));
for datastore in stats.2.iter() {
values.push(Arc::new(
MetricsData::new("blockstat", ctime, datastore.to_value())?
.tag("object", "host")
.tag("host", nodename)
.tag("datastore", datastore.name.clone()),
));
}
// we must have a concrete functions, because the inferred lifetime from a
// closure is not general enough for the tokio::spawn call we are in here...
fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
&item.0
}
let results =
proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
for (res, name) in results
.into_iter()
.zip(channel_list.iter().map(|(_, name)| name))
{
if let Err(err) = res {
log::error!("error sending into channel of {name}: {err}");
}
}
futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
if let Err(err) = channel.join().await {
log::error!("error sending to metric server {name}: {err}");
}
}))
.await;
Ok(())
}
/// Get the metric server connections from a config
fn get_metric_server_connections(
metric_config: proxmox_section_config::SectionConfigData,
) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
let mut res = Vec::new();
for config in
metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
{
if !config.enable {
continue;
}
let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
res.push((future, config.name));
}
for config in
metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
{
if !config.enable {
continue;
}
let future = proxmox_metrics::influxdb_http(
&config.url,
config.organization.as_deref().unwrap_or("proxmox"),
config.bucket.as_deref().unwrap_or("proxmox"),
config.token.as_deref(),
config.verify_tls.unwrap_or(true),
config.max_body_size.unwrap_or(25_000_000),
)?;
res.push((future, config.name));
}
Ok(res)
}

View File

@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation};
use serde_json::{json, Value};
use tokio::join;
use proxmox_metrics::MetricsData;
use proxmox_sys::{
fs::FileSystemInformation,
linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
mod metric_server;
pub mod rrd;
/// Initialize the metric collection subsystem.
@ -70,7 +70,7 @@ async fn run_stat_generator() {
}
});
let metrics_future = send_data_to_metric_servers(stats);
let metrics_future = metric_server::send_data_to_metric_servers(stats);
let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
if let Err(err) = rrd_res {
@ -84,134 +84,6 @@ async fn run_stat_generator() {
}
}
async fn send_data_to_metric_servers(
stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
) -> Result<(), Error> {
let (config, _digest) = pbs_config::metrics::config()?;
let channel_list = get_metric_server_connections(config)?;
if channel_list.is_empty() {
return Ok(());
}
let ctime = proxmox_time::epoch_i64();
let nodename = proxmox_sys::nodename();
let mut values = Vec::new();
let mut cpuvalue = match &stats.0.proc {
Some(stat) => serde_json::to_value(stat)?,
None => json!({}),
};
if let Some(loadavg) = &stats.0.load {
cpuvalue["avg1"] = Value::from(loadavg.0);
cpuvalue["avg5"] = Value::from(loadavg.1);
cpuvalue["avg15"] = Value::from(loadavg.2);
}
values.push(Arc::new(
MetricsData::new("cpustat", ctime, cpuvalue)?
.tag("object", "host")
.tag("host", nodename),
));
if let Some(stat) = &stats.0.meminfo {
values.push(Arc::new(
MetricsData::new("memory", ctime, stat)?
.tag("object", "host")
.tag("host", nodename),
));
}
if let Some(netdev) = &stats.0.net {
for item in netdev {
values.push(Arc::new(
MetricsData::new("nics", ctime, item)?
.tag("object", "host")
.tag("host", nodename)
.tag("instance", item.device.clone()),
));
}
}
values.push(Arc::new(
MetricsData::new("blockstat", ctime, stats.1.to_value())?
.tag("object", "host")
.tag("host", nodename),
));
for datastore in stats.2.iter() {
values.push(Arc::new(
MetricsData::new("blockstat", ctime, datastore.to_value())?
.tag("object", "host")
.tag("host", nodename)
.tag("datastore", datastore.name.clone()),
));
}
// we must have a concrete functions, because the inferred lifetime from a
// closure is not general enough for the tokio::spawn call we are in here...
fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
&item.0
}
let results =
proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
for (res, name) in results
.into_iter()
.zip(channel_list.iter().map(|(_, name)| name))
{
if let Err(err) = res {
log::error!("error sending into channel of {name}: {err}");
}
}
futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
if let Err(err) = channel.join().await {
log::error!("error sending to metric server {name}: {err}");
}
}))
.await;
Ok(())
}
/// Get the metric server connections from a config
fn get_metric_server_connections(
metric_config: proxmox_section_config::SectionConfigData,
) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
let mut res = Vec::new();
for config in
metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
{
if !config.enable {
continue;
}
let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
res.push((future, config.name));
}
for config in
metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
{
if !config.enable {
continue;
}
let future = proxmox_metrics::influxdb_http(
&config.url,
config.organization.as_deref().unwrap_or("proxmox"),
config.bucket.as_deref().unwrap_or("proxmox"),
config.token.as_deref(),
config.verify_tls.unwrap_or(true),
config.max_body_size.unwrap_or(25_000_000),
)?;
res.push((future, config.name));
}
Ok(res)
}
struct HostStats {
proc: Option<ProcFsStat>,
meminfo: Option<ProcFsMemInfo>,