commit 5bc85d2d04ed7a388b56f5efa91667086753a265 Author: Dietmar Maurer Date: Sat May 23 09:29:33 2020 +0200 add simple rrd implementation diff --git a/src/rrd/cache.rs b/src/rrd/cache.rs new file mode 100644 index 00000000..9f8ef9f4 --- /dev/null +++ b/src/rrd/cache.rs @@ -0,0 +1,82 @@ +use std::time::{SystemTime, UNIX_EPOCH}; +use std::path::PathBuf; +use std::collections::HashMap; +use std::sync::{RwLock}; + +use anyhow::{format_err, Error}; +use lazy_static::lazy_static; +use serde_json::Value; + +use proxmox::tools::fs::{create_path, CreateOptions}; + +use super::*; + +const PBS_RRD_BASEDIR: &str = "/var/lib/proxmox-backup/rrdb"; + +lazy_static!{ + static ref RRD_CACHE: RwLock> = { + RwLock::new(HashMap::new()) + }; +} + +/// Create rrdd stat dir with correct permission +pub fn create_rrdb_dir() -> Result<(), Error> { + + let backup_user = crate::backup::backup_user()?; + let opts = CreateOptions::new() + .owner(backup_user.uid) + .group(backup_user.gid); + + create_path(PBS_RRD_BASEDIR, None, Some(opts)) + .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; + + Ok(()) +} + +fn now() -> Result { + let epoch = SystemTime::now().duration_since(UNIX_EPOCH)?; + Ok(epoch.as_secs()) +} + +pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> { + + let mut path = PathBuf::from(PBS_RRD_BASEDIR); + path.push(rel_path); + + std::fs::create_dir_all(path.parent().unwrap())?; + + let mut map = RRD_CACHE.write().unwrap(); + let now = now()?; + + if let Some(rrd) = map.get_mut(rel_path) { + rrd.update(now, value); + rrd.save(&path)?; + } else { + let mut rrd = match RRD::load(&path) { + Ok(rrd) => rrd, + Err(_) => RRD::new(), + }; + rrd.update(now, value); + rrd.save(&path)?; + map.insert(rel_path.into(), rrd); + } + + Ok(()) +} + +pub fn extract_data( + rel_path: &str, + timeframe: RRDTimeFrameResolution, + mode: RRDMode, +) -> Result { + + let now = now()?; + + let map = RRD_CACHE.read().unwrap(); + + if let Some(rrd) = map.get(rel_path) { + Ok(rrd.extract_data(now, timeframe, mode)) + } else { + Ok(RRD::new().extract_data(now, timeframe, mode)) + } +} diff --git a/src/rrd/mod.rs b/src/rrd/mod.rs new file mode 100644 index 00000000..c09efebf --- /dev/null +++ b/src/rrd/mod.rs @@ -0,0 +1,4 @@ +mod rrd; +pub use rrd::*; +mod cache; +pub use cache::*; diff --git a/src/rrd/rrd.rs b/src/rrd/rrd.rs new file mode 100644 index 00000000..4fcf64f8 --- /dev/null +++ b/src/rrd/rrd.rs @@ -0,0 +1,224 @@ +use std::io::Read; +use std::path::Path; + +use anyhow::{bail, Error}; +use serde_json::{json, Value}; + +const RRD_DATA_ENTRIES: usize = 70; + +#[derive(Copy, Clone)] +pub enum RRDMode { + Max, + Average, +} + +#[repr(u64)] +#[derive(Copy, Clone)] +pub enum RRDTimeFrameResolution { + Hour = 60, // 1 min => last 70 minutes + Day = 60*30, // 30 min => last 35 hours + Week = 60*180, // 3 hours => about 8 days + Month = 60*720, // 12 hours => last 35 days + Year = 60*10080, // 1 week => last 490 days +} + +#[repr(C)] +#[derive(Default, Copy, Clone)] +struct RRDEntry { + max: f64, + average: f64, + count: u64, +} + +#[repr(C)] +// Note: Avoid alignment problems by using 8byte types only +pub struct RRD { + last_update: u64, + hour: [RRDEntry; RRD_DATA_ENTRIES], + day: [RRDEntry; RRD_DATA_ENTRIES], + week: [RRDEntry; RRD_DATA_ENTRIES], + month: [RRDEntry; RRD_DATA_ENTRIES], + year: [RRDEntry; RRD_DATA_ENTRIES], +} + +impl RRD { + + pub fn new() -> Self { + Self { + last_update: 0, + hour: [RRDEntry::default(); RRD_DATA_ENTRIES], + day: [RRDEntry::default(); RRD_DATA_ENTRIES], + week: [RRDEntry::default(); RRD_DATA_ENTRIES], + month: [RRDEntry::default(); RRD_DATA_ENTRIES], + year: [RRDEntry::default(); RRD_DATA_ENTRIES], + } + } + + pub fn extract_data( + &self, + epoch: u64, + timeframe: RRDTimeFrameResolution, + mode: RRDMode, + ) -> Value { + + let reso = timeframe as u64; + + let end = reso*(epoch/reso); + let start = end - reso*(RRD_DATA_ENTRIES as u64); + + let rrd_end = reso*(self.last_update/reso); + let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64); + + let mut list = Vec::new(); + + let data = match timeframe { + RRDTimeFrameResolution::Hour => &self.hour, + RRDTimeFrameResolution::Day => &self.day, + RRDTimeFrameResolution::Week => &self.week, + RRDTimeFrameResolution::Month => &self.month, + RRDTimeFrameResolution::Year => &self.year, + }; + + let mut t = start; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < rrd_start || t > rrd_end { + list.push(json!({ "time": t })); + } else { + let entry = data[index]; + if entry.count == 0 { + list.push(json!({ "time": t })); + } else { + let value = match mode { + RRDMode::Max => entry.max, + RRDMode::Average => entry.average, + }; + list.push(json!({ "time": t, "value": value })); + } + } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + + list.into() + } + + pub fn from_raw(mut raw: &[u8]) -> Result { + let expected_len = std::mem::size_of::(); + if raw.len() != expected_len { + bail!("RRD::from_raw failed - wrong data size ({} != {})", raw.len(), expected_len); + } + + let mut rrd: RRD = unsafe { std::mem::zeroed() }; + unsafe { + let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); + raw.read_exact(rrd_slice)?; + } + + Ok(rrd) + } + + pub fn load(filename: &Path) -> Result { + let raw = proxmox::tools::fs::file_get_contents(filename)?; + Self::from_raw(&raw) + } + + pub fn save(&self, filename: &Path) -> Result<(), Error> { + use proxmox::tools::{fs::replace_file, fs::CreateOptions}; + + let rrd_slice = unsafe { + std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::()) + }; + + let backup_user = crate::backup::backup_user()?; + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644); + // set the correct owner/group/permissions while saving file + // owner(rw) = backup, group(r)= backup + let options = CreateOptions::new() + .perm(mode) + .owner(backup_user.uid) + .group(backup_user.gid); + + replace_file(filename, rrd_slice, options)?; + + Ok(()) + } + + fn compute_new_value( + data: &[RRDEntry; RRD_DATA_ENTRIES], + index: usize, + value: f64, + ) -> RRDEntry { + let RRDEntry { max, average, count } = data[index]; + let new_count = count + 1; // fixme: check overflow? + if count == 0 { + RRDEntry { max: value, average: value, count: 1 } + } else { + let new_max = if max > value { max } else { value }; + let new_average = (average*(count as f64) + value)/(new_count as f64); + RRDEntry { max: new_max, average: new_average, count: new_count } + } + } + + pub fn update(&mut self, epoch: u64, value: f64) { + // fixme: check time progress (epoch last) + let last = self.last_update; + + let reso = RRDTimeFrameResolution::Hour as u64; + + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let mut t = last; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < min_time { self.hour[index] = RRDEntry::default(); } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + self.hour[index] = Self::compute_new_value(&self.hour, index, value); + + let reso = RRDTimeFrameResolution::Day as u64; + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let mut t = last; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < min_time { self.day[index] = RRDEntry::default(); } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + self.day[index] = Self::compute_new_value(&self.day, index, value); + + let reso = RRDTimeFrameResolution::Week as u64; + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let mut t = last; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < min_time { self.week[index] = RRDEntry::default(); } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + self.week[index] = Self::compute_new_value(&self.week, index, value); + + let reso = RRDTimeFrameResolution::Month as u64; + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let mut t = last; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < min_time { self.month[index] = RRDEntry::default(); } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + self.month[index] = Self::compute_new_value(&self.month, index, value); + + let reso = RRDTimeFrameResolution::Year as u64; + let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let mut t = last; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + if t < min_time { self.year[index] = RRDEntry::default(); } + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + self.year[index] = Self::compute_new_value(&self.year, index, value); + + self.last_update = epoch; + } +}