diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs new file mode 100644 index 00000000..cc9f736a --- /dev/null +++ b/src/server/metric_collection/metric_server.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use anyhow::Error; +use serde_json::{json, Value}; + +use proxmox_metrics::MetricsData; + +use super::{DiskStat, HostStats}; + +pub async fn send_data_to_metric_servers( + stats: Arc<(HostStats, DiskStat, Vec)>, +) -> Result<(), Error> { + let (config, _digest) = pbs_config::metrics::config()?; + let channel_list = get_metric_server_connections(config)?; + + if channel_list.is_empty() { + return Ok(()); + } + + let ctime = proxmox_time::epoch_i64(); + let nodename = proxmox_sys::nodename(); + + let mut values = Vec::new(); + + let mut cpuvalue = match &stats.0.proc { + Some(stat) => serde_json::to_value(stat)?, + None => json!({}), + }; + + if let Some(loadavg) = &stats.0.load { + cpuvalue["avg1"] = Value::from(loadavg.0); + cpuvalue["avg5"] = Value::from(loadavg.1); + cpuvalue["avg15"] = Value::from(loadavg.2); + } + + values.push(Arc::new( + MetricsData::new("cpustat", ctime, cpuvalue)? + .tag("object", "host") + .tag("host", nodename), + )); + + if let Some(stat) = &stats.0.meminfo { + values.push(Arc::new( + MetricsData::new("memory", ctime, stat)? + .tag("object", "host") + .tag("host", nodename), + )); + } + + if let Some(netdev) = &stats.0.net { + for item in netdev { + values.push(Arc::new( + MetricsData::new("nics", ctime, item)? + .tag("object", "host") + .tag("host", nodename) + .tag("instance", item.device.clone()), + )); + } + } + + values.push(Arc::new( + MetricsData::new("blockstat", ctime, stats.1.to_value())? + .tag("object", "host") + .tag("host", nodename), + )); + + for datastore in stats.2.iter() { + values.push(Arc::new( + MetricsData::new("blockstat", ctime, datastore.to_value())? + .tag("object", "host") + .tag("host", nodename) + .tag("datastore", datastore.name.clone()), + )); + } + + // we must have a concrete functions, because the inferred lifetime from a + // closure is not general enough for the tokio::spawn call we are in here... + fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics { + &item.0 + } + + let results = + proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await; + for (res, name) in results + .into_iter() + .zip(channel_list.iter().map(|(_, name)| name)) + { + if let Err(err) = res { + log::error!("error sending into channel of {name}: {err}"); + } + } + + futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move { + if let Err(err) = channel.join().await { + log::error!("error sending to metric server {name}: {err}"); + } + })) + .await; + + Ok(()) +} + +/// Get the metric server connections from a config +fn get_metric_server_connections( + metric_config: proxmox_section_config::SectionConfigData, +) -> Result, Error> { + let mut res = Vec::new(); + + for config in + metric_config.convert_to_typed_array::("influxdb-udp")? + { + if !config.enable { + continue; + } + let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu); + res.push((future, config.name)); + } + + for config in + metric_config.convert_to_typed_array::("influxdb-http")? + { + if !config.enable { + continue; + } + let future = proxmox_metrics::influxdb_http( + &config.url, + config.organization.as_deref().unwrap_or("proxmox"), + config.bucket.as_deref().unwrap_or("proxmox"), + config.token.as_deref(), + config.verify_tls.unwrap_or(true), + config.max_body_size.unwrap_or(25_000_000), + )?; + res.push((future, config.name)); + } + Ok(res) +} diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs index 5102227b..5a516564 100644 --- a/src/server/metric_collection/mod.rs +++ b/src/server/metric_collection/mod.rs @@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation}; use serde_json::{json, Value}; use tokio::join; -use proxmox_metrics::MetricsData; use proxmox_sys::{ fs::FileSystemInformation, linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat}, @@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage}; use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge}; +mod metric_server; pub mod rrd; /// Initialize the metric collection subsystem. @@ -70,7 +70,7 @@ async fn run_stat_generator() { } }); - let metrics_future = send_data_to_metric_servers(stats); + let metrics_future = metric_server::send_data_to_metric_servers(stats); let (rrd_res, metrics_res) = join!(rrd_future, metrics_future); if let Err(err) = rrd_res { @@ -84,134 +84,6 @@ async fn run_stat_generator() { } } -async fn send_data_to_metric_servers( - stats: Arc<(HostStats, DiskStat, Vec)>, -) -> Result<(), Error> { - let (config, _digest) = pbs_config::metrics::config()?; - let channel_list = get_metric_server_connections(config)?; - - if channel_list.is_empty() { - return Ok(()); - } - - let ctime = proxmox_time::epoch_i64(); - let nodename = proxmox_sys::nodename(); - - let mut values = Vec::new(); - - let mut cpuvalue = match &stats.0.proc { - Some(stat) => serde_json::to_value(stat)?, - None => json!({}), - }; - - if let Some(loadavg) = &stats.0.load { - cpuvalue["avg1"] = Value::from(loadavg.0); - cpuvalue["avg5"] = Value::from(loadavg.1); - cpuvalue["avg15"] = Value::from(loadavg.2); - } - - values.push(Arc::new( - MetricsData::new("cpustat", ctime, cpuvalue)? - .tag("object", "host") - .tag("host", nodename), - )); - - if let Some(stat) = &stats.0.meminfo { - values.push(Arc::new( - MetricsData::new("memory", ctime, stat)? - .tag("object", "host") - .tag("host", nodename), - )); - } - - if let Some(netdev) = &stats.0.net { - for item in netdev { - values.push(Arc::new( - MetricsData::new("nics", ctime, item)? - .tag("object", "host") - .tag("host", nodename) - .tag("instance", item.device.clone()), - )); - } - } - - values.push(Arc::new( - MetricsData::new("blockstat", ctime, stats.1.to_value())? - .tag("object", "host") - .tag("host", nodename), - )); - - for datastore in stats.2.iter() { - values.push(Arc::new( - MetricsData::new("blockstat", ctime, datastore.to_value())? - .tag("object", "host") - .tag("host", nodename) - .tag("datastore", datastore.name.clone()), - )); - } - - // we must have a concrete functions, because the inferred lifetime from a - // closure is not general enough for the tokio::spawn call we are in here... - fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics { - &item.0 - } - - let results = - proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await; - for (res, name) in results - .into_iter() - .zip(channel_list.iter().map(|(_, name)| name)) - { - if let Err(err) = res { - log::error!("error sending into channel of {name}: {err}"); - } - } - - futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move { - if let Err(err) = channel.join().await { - log::error!("error sending to metric server {name}: {err}"); - } - })) - .await; - - Ok(()) -} - -/// Get the metric server connections from a config -fn get_metric_server_connections( - metric_config: proxmox_section_config::SectionConfigData, -) -> Result, Error> { - let mut res = Vec::new(); - - for config in - metric_config.convert_to_typed_array::("influxdb-udp")? - { - if !config.enable { - continue; - } - let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu); - res.push((future, config.name)); - } - - for config in - metric_config.convert_to_typed_array::("influxdb-http")? - { - if !config.enable { - continue; - } - let future = proxmox_metrics::influxdb_http( - &config.url, - config.organization.as_deref().unwrap_or("proxmox"), - config.bucket.as_deref().unwrap_or("proxmox"), - config.token.as_deref(), - config.verify_tls.unwrap_or(true), - config.max_body_size.unwrap_or(25_000_000), - )?; - res.push((future, config.name)); - } - Ok(res) -} - struct HostStats { proc: Option, meminfo: Option,