diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml new file mode 100644 index 00000000..c0b7d708 --- /dev/null +++ b/proxmox-rrd/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "proxmox-rrd" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +description = "Simple RRD database implementation." + +[dev-dependencies] +proxmox-router = { workspace = true, features = ["cli", "server"] } + +[dependencies] +anyhow.workspace = true +bitflags.workspace = true +crossbeam-channel.workspace = true +libc.workspace = true +log.workspace = true +nix.workspace = true +serde.workspace = true +serde_cbor.workspace = true +serde_json.workspace = true + +proxmox-schema = { workspace = true, features = [ "api-macro" ] } +proxmox-sys.workspace = true +proxmox-time.workspace = true diff --git a/proxmox-rrd/examples/prrd.rs b/proxmox-rrd/examples/prrd.rs new file mode 100644 index 00000000..c7d29376 --- /dev/null +++ b/proxmox-rrd/examples/prrd.rs @@ -0,0 +1,390 @@ +//! RRD toolkit - create/manage/update proxmox RRD (v2) file + +use std::path::PathBuf; + +use anyhow::{bail, Error}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use proxmox_router::cli::{ + complete_file_name, run_cli_command, CliCommand, CliCommandMap, CliEnvironment, +}; +use proxmox_router::RpcEnvironment; +use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema}; + +use proxmox_sys::fs::CreateOptions; + +use proxmox_rrd::rrd::{CF, DST, RRA, RRD}; + +pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new("Index of the RRA.").minimum(0).schema(); + +pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new("RRA configuration") + .format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA)) + .schema(); + +#[api( + properties: {}, + default_key: "cf", +)] +#[derive(Debug, Serialize, Deserialize)] +/// RRA configuration +pub struct RRAConfig { + /// Time resolution + pub r: u64, + pub cf: CF, + /// Number of data points + pub n: u64, +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + }, + }, +)] +/// Dump the RRD file in JSON format +pub fn dump_rrd(path: String) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + serde_json::to_writer_pretty(std::io::stdout(), &rrd)?; + println!(); + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + }, + }, +)] +/// RRD file information +pub fn rrd_info(path: String) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + + println!("DST: {:?}", rrd.source.dst); + + for (i, rra) in rrd.rra_list.iter().enumerate() { + // use RRAConfig property string format + println!( + "RRA[{}]: {:?},r={},n={}", + i, + rra.cf, + rra.resolution, + rra.data.len() + ); + } + + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + time: { + description: "Update time.", + optional: true, + }, + value: { + description: "Update value.", + }, + }, + }, +)] +/// Update the RRD database +pub fn update_rrd(path: String, time: Option, value: f64) -> Result<(), Error> { + let path = PathBuf::from(path); + + let time = time + .map(|v| v as f64) + .unwrap_or_else(proxmox_time::epoch_f64); + + let mut rrd = RRD::load(&path, false)?; + rrd.update(time, value); + + rrd.save(&path, CreateOptions::new(), false)?; + + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + cf: { + type: CF, + }, + resolution: { + description: "Time resolution", + }, + start: { + description: "Start time. If not specified, we simply extract 10 data points.", + optional: true, + }, + end: { + description: "End time (Unix Epoch). Default is the last update time.", + optional: true, + }, + }, + }, +)] +/// Fetch data from the RRD file +pub fn fetch_rrd( + path: String, + cf: CF, + resolution: u64, + start: Option, + end: Option, +) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + + let data = rrd.extract_data(cf, resolution, start, end)?; + + println!("{}", serde_json::to_string_pretty(&data)?); + + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + "rra-index": { + schema: RRA_INDEX_SCHEMA, + }, + }, + }, +)] +/// Return the Unix timestamp of the first time slot inside the +/// specified RRA (slot start time) +pub fn first_update_time(path: String, rra_index: usize) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + + if rra_index >= rrd.rra_list.len() { + bail!("rra-index is out of range"); + } + let rra = &rrd.rra_list[rra_index]; + let duration = (rra.data.len() as u64) * rra.resolution; + let first = rra.slot_start_time((rrd.source.last_update as u64).saturating_sub(duration)); + + println!("{}", first); + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + }, + }, +)] +/// Return the Unix timestamp of the last update +pub fn last_update_time(path: String) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + + println!("{}", rrd.source.last_update); + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + }, + }, +)] +/// Return the time and value from the last update +pub fn last_update(path: String) -> Result<(), Error> { + let rrd = RRD::load(&PathBuf::from(path), false)?; + + let result = json!({ + "time": rrd.source.last_update, + "value": rrd.source.last_value, + }); + + println!("{}", serde_json::to_string_pretty(&result)?); + + Ok(()) +} + +#[api( + input: { + properties: { + dst: { + type: DST, + }, + path: { + description: "The filename to create." + }, + rra: { + description: "Configuration of contained RRAs.", + type: Array, + items: { + schema: RRA_CONFIG_STRING_SCHEMA, + } + }, + }, + }, +)] +/// Create a new RRD file +pub fn create_rrd(dst: DST, path: String, rra: Vec) -> Result<(), Error> { + let mut rra_list = Vec::new(); + + for item in rra.iter() { + let rra: RRAConfig = + serde_json::from_value(RRAConfig::API_SCHEMA.parse_property_string(item)?)?; + println!("GOT {:?}", rra); + rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize)); + } + + let path = PathBuf::from(path); + + let rrd = RRD::new(dst, rra_list); + + rrd.save(&path, CreateOptions::new(), false)?; + + Ok(()) +} + +#[api( + input: { + properties: { + path: { + description: "The filename." + }, + "rra-index": { + schema: RRA_INDEX_SCHEMA, + }, + slots: { + description: "The number of slots you want to add or remove.", + type: i64, + }, + }, + }, +)] +/// Resize. Change the number of data slots for the specified RRA. +pub fn resize_rrd(path: String, rra_index: usize, slots: i64) -> Result<(), Error> { + let path = PathBuf::from(&path); + + let mut rrd = RRD::load(&path, false)?; + + if rra_index >= rrd.rra_list.len() { + bail!("rra-index is out of range"); + } + + let rra = &rrd.rra_list[rra_index]; + + let new_slots = (rra.data.len() as i64) + slots; + + if new_slots < 1 { + bail!("number of new slots is too small ('{}' < 1)", new_slots); + } + + if new_slots > 1024 * 1024 { + bail!("number of new slots is too big ('{}' > 1M)", new_slots); + } + + let rra_end = rra.slot_end_time(rrd.source.last_update as u64); + let rra_start = rra_end - rra.resolution * (rra.data.len() as u64); + let (start, reso, data) = rra + .extract_data(rra_start, rra_end, rrd.source.last_update) + .into(); + + let mut new_rra = RRA::new(rra.cf, rra.resolution, new_slots as usize); + new_rra.last_count = rra.last_count; + + new_rra.insert_data(start, reso, data)?; + + rrd.rra_list[rra_index] = new_rra; + + rrd.save(&path, CreateOptions::new(), false)?; + + Ok(()) +} + +fn main() -> Result<(), Error> { + let uid = nix::unistd::Uid::current(); + + let username = match nix::unistd::User::from_uid(uid)? { + Some(user) => user.name, + None => bail!("unable to get user name"), + }; + + let cmd_def = CliCommandMap::new() + .insert( + "create", + CliCommand::new(&API_METHOD_CREATE_RRD) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "dump", + CliCommand::new(&API_METHOD_DUMP_RRD) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "fetch", + CliCommand::new(&API_METHOD_FETCH_RRD) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "first", + CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "info", + CliCommand::new(&API_METHOD_RRD_INFO) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "last", + CliCommand::new(&API_METHOD_LAST_UPDATE_TIME) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "lastupdate", + CliCommand::new(&API_METHOD_LAST_UPDATE) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "resize", + CliCommand::new(&API_METHOD_RESIZE_RRD) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ) + .insert( + "update", + CliCommand::new(&API_METHOD_UPDATE_RRD) + .arg_param(&["path"]) + .completion_cb("path", complete_file_name), + ); + + let mut rpcenv = CliEnvironment::new(); + rpcenv.set_auth_id(Some(format!("{}@pam", username))); + + run_cli_command(cmd_def, rpcenv, None); + + Ok(()) +} diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs new file mode 100644 index 00000000..254010f3 --- /dev/null +++ b/proxmox-rrd/src/cache.rs @@ -0,0 +1,448 @@ +use std::collections::BTreeSet; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use std::thread::spawn; +use std::time::SystemTime; + +use anyhow::{bail, format_err, Error}; +use crossbeam_channel::{bounded, TryRecvError}; + +use proxmox_sys::fs::{create_path, CreateOptions}; + +use crate::rrd::{CF, DST, RRA, RRD}; +use crate::Entry; + +mod journal; +use journal::*; + +mod rrd_map; +use rrd_map::*; + +/// RRD cache - keep RRD data in RAM, but write updates to disk +/// +/// This cache is designed to run as single instance (no concurrent +/// access from other processes). +pub struct RRDCache { + config: Arc, + state: Arc>, + rrd_map: Arc>, +} + +pub(crate) struct CacheConfig { + apply_interval: f64, + basedir: PathBuf, + file_options: CreateOptions, + dir_options: CreateOptions, +} + +impl RRDCache { + /// Creates a new instance + /// + /// `basedir`: All files are stored relative to this path. + /// + /// `file_options`: Files are created with this options. + /// + /// `dir_options`: Directories are created with this options. + /// + /// `apply_interval`: Commit journal after `apply_interval` seconds. + /// + /// `load_rrd_cb`; The callback function is used to load RRD files, + /// and should return a newly generated RRD if the file does not + /// exists (or is unreadable). This may generate RRDs with + /// different configurations (dependent on `rel_path`). + pub fn new>( + basedir: P, + file_options: Option, + dir_options: Option, + apply_interval: f64, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, + ) -> Result { + let basedir = basedir.as_ref().to_owned(); + + let file_options = file_options.unwrap_or_else(CreateOptions::new); + let dir_options = dir_options.unwrap_or_else(CreateOptions::new); + + create_path( + &basedir, + Some(dir_options.clone()), + Some(dir_options.clone()), + ) + .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; + + let config = Arc::new(CacheConfig { + basedir, + file_options, + dir_options, + apply_interval, + }); + + let state = JournalState::new(Arc::clone(&config))?; + let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb); + + Ok(Self { + config: Arc::clone(&config), + state: Arc::new(RwLock::new(state)), + rrd_map: Arc::new(RwLock::new(rrd_map)), + }) + } + + /// Create a new RRD as used by the proxmox backup server + /// + /// It contains the following RRAs: + /// + /// * cf=average,r=60,n=1440 => 1day + /// * cf=maximum,r=60,n=1440 => 1day + /// * cf=average,r=30*60,n=1440 => 1month + /// * cf=maximum,r=30*60,n=1440 => 1month + /// * cf=average,r=6*3600,n=1440 => 1year + /// * cf=maximum,r=6*3600,n=1440 => 1year + /// * cf=average,r=7*86400,n=570 => 10years + /// * cf=maximum,r=7*86400,n=570 => 10year + /// + /// The resulting data file size is about 80KB. + pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD { + let rra_list = vec![ + // 1 min * 1440 => 1 day + RRA::new(CF::Average, 60, 1440), + RRA::new(CF::Maximum, 60, 1440), + // 30 min * 1440 => 30 days ~ 1 month + RRA::new(CF::Average, 30 * 60, 1440), + RRA::new(CF::Maximum, 30 * 60, 1440), + // 6 h * 1440 => 360 days ~ 1 year + RRA::new(CF::Average, 6 * 3600, 1440), + RRA::new(CF::Maximum, 6 * 3600, 1440), + // 1 week * 570 => 10 years + RRA::new(CF::Average, 7 * 86400, 570), + RRA::new(CF::Maximum, 7 * 86400, 570), + ]; + + RRD::new(dst, rra_list) + } + + /// Sync the journal data to disk (using `fdatasync` syscall) + pub fn sync_journal(&self) -> Result<(), Error> { + self.state.read().unwrap().sync_journal() + } + + /// Apply and commit the journal. Should be used at server startup. + pub fn apply_journal(&self) -> Result { + let config = Arc::clone(&self.config); + let state = Arc::clone(&self.state); + let rrd_map = Arc::clone(&self.rrd_map); + + let mut state_guard = self.state.write().unwrap(); + let journal_applied = state_guard.journal_applied; + + if let Some(ref recv) = state_guard.apply_thread_result { + match recv.try_recv() { + Ok(Ok(())) => { + // finished without errors, OK + state_guard.apply_thread_result = None; + } + Ok(Err(err)) => { + // finished with errors, log them + log::error!("{}", err); + state_guard.apply_thread_result = None; + } + Err(TryRecvError::Empty) => { + // still running + return Ok(journal_applied); + } + Err(TryRecvError::Disconnected) => { + // crashed, start again + log::error!("apply journal thread crashed - try again"); + state_guard.apply_thread_result = None; + } + } + } + + let now = proxmox_time::epoch_f64(); + let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; + + if journal_applied && !wants_commit { + return Ok(journal_applied); + } + + state_guard.last_journal_flush = proxmox_time::epoch_f64(); + + let (sender, receiver) = bounded(1); + state_guard.apply_thread_result = Some(receiver); + + spawn(move || { + let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied) + .map_err(|err| err.to_string()); + sender.send(result).unwrap(); + }); + + Ok(journal_applied) + } + + /// Update data in RAM and write file back to disk (journal) + pub fn update_value( + &self, + rel_path: &str, + time: f64, + value: f64, + dst: DST, + ) -> Result<(), Error> { + let journal_applied = self.apply_journal()?; + + self.state + .write() + .unwrap() + .append_journal_entry(time, value, dst, rel_path)?; + + if journal_applied { + self.rrd_map + .write() + .unwrap() + .update(rel_path, time, value, dst, false)?; + } + + Ok(()) + } + + /// Extract data from cached RRD + /// + /// `start`: Start time. If not specified, we simply extract 10 data points. + /// + /// `end`: End time. Default is to use the current time. + pub fn extract_cached_data( + &self, + base: &str, + name: &str, + cf: CF, + resolution: u64, + start: Option, + end: Option, + ) -> Result, Error> { + self.rrd_map + .read() + .unwrap() + .extract_cached_data(base, name, cf, resolution, start, end) + } +} + +fn apply_and_commit_journal_thread( + config: Arc, + state: Arc>, + rrd_map: Arc>, + commit_only: bool, +) -> Result<(), Error> { + if commit_only { + state.write().unwrap().rotate_journal()?; // start new journal, keep old one + } else { + let start_time = SystemTime::now(); + log::debug!("applying rrd journal"); + + match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { + Ok(entries) => { + let elapsed = start_time.elapsed().unwrap().as_secs_f64(); + log::info!( + "applied rrd journal ({} entries in {:.3} seconds)", + entries, + elapsed + ); + } + Err(err) => bail!("apply rrd journal failed - {}", err), + } + } + + let start_time = SystemTime::now(); + log::debug!("commit rrd journal"); + + match commit_journal_impl(config, state, rrd_map) { + Ok(rrd_file_count) => { + let elapsed = start_time.elapsed().unwrap().as_secs_f64(); + log::info!( + "rrd journal successfully committed ({} files in {:.3} seconds)", + rrd_file_count, + elapsed + ); + } + Err(err) => bail!("rrd journal commit failed: {}", err), + } + Ok(()) +} + +fn apply_journal_lines( + state: Arc>, + rrd_map: Arc>, + journal_name: &str, // used for logging + reader: &mut BufReader, + lock_read_line: bool, +) -> Result { + let mut linenr = 0; + + loop { + linenr += 1; + let mut line = String::new(); + let len = if lock_read_line { + let _lock = state.read().unwrap(); // make sure we read entire lines + reader.read_line(&mut line)? + } else { + reader.read_line(&mut line)? + }; + + if len == 0 { + break; + } + + let entry: JournalEntry = match line.parse() { + Ok(entry) => entry, + Err(err) => { + log::warn!( + "unable to parse rrd journal '{}' line {} (skip) - {}", + journal_name, + linenr, + err, + ); + continue; // skip unparsable lines + } + }; + + rrd_map.write().unwrap().update( + &entry.rel_path, + entry.time, + entry.value, + entry.dst, + true, + )?; + } + Ok(linenr) +} + +fn apply_journal_impl( + state: Arc>, + rrd_map: Arc>, +) -> Result { + let mut lines = 0; + + // Apply old journals first + let journal_list = state.read().unwrap().list_old_journals()?; + + for entry in journal_list { + log::info!("apply old journal log {}", entry.name); + let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?; + let mut reader = BufReader::new(file); + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + &entry.name, + &mut reader, + false, + )?; + } + + let mut journal = state.read().unwrap().open_journal_reader()?; + + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + "rrd.journal", + &mut journal, + true, + )?; + + { + let mut state_guard = state.write().unwrap(); // block other writers + + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + "rrd.journal", + &mut journal, + false, + )?; + + state_guard.rotate_journal()?; // start new journal, keep old one + + // We need to apply the journal only once, because further updates + // are always directly applied. + state_guard.journal_applied = true; + } + + Ok(lines) +} + +fn fsync_file_or_dir(path: &Path) -> Result<(), Error> { + let file = std::fs::File::open(path)?; + nix::unistd::fsync(file.as_raw_fd())?; + Ok(()) +} + +pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> { + let file = std::fs::File::open(path)?; + nix::unistd::fsync(file.as_raw_fd())?; + if let Some(parent) = path.parent() { + fsync_file_or_dir(parent)?; + } + Ok(()) +} + +fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf { + let mut path = basedir.to_owned(); + let rel_path = Path::new(rel_path); + if let Some(parent) = rel_path.parent() { + path.push(parent); + } + path +} + +fn commit_journal_impl( + config: Arc, + state: Arc>, + rrd_map: Arc>, +) -> Result { + let files = rrd_map.read().unwrap().file_list(); + + let mut rrd_file_count = 0; + let mut errors = 0; + + let mut dir_set = BTreeSet::new(); + + log::info!("write rrd data back to disk"); + + // save all RRDs - we only need a read lock here + // Note: no fsync here (we do it afterwards) + for rel_path in files.iter() { + let parent_dir = rrd_parent_dir(&config.basedir, rel_path); + dir_set.insert(parent_dir); + rrd_file_count += 1; + if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) { + errors += 1; + log::error!("unable to save rrd {}: {}", rel_path, err); + } + } + + if errors != 0 { + bail!("errors during rrd flush - unable to commit rrd journal"); + } + + // Important: We fsync files after writing all data! This increase + // the likelihood that files are already synced, so this is + // much faster (although we need to re-open the files). + + log::info!("starting rrd data sync"); + + for rel_path in files.iter() { + let mut path = config.basedir.clone(); + path.push(rel_path); + fsync_file_or_dir(&path) + .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?; + } + + // also fsync directories + for dir_path in dir_set { + fsync_file_or_dir(&dir_path) + .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?; + } + + // if everything went ok, remove the old journal files + state.write().unwrap().remove_old_journals()?; + + Ok(rrd_file_count) +} diff --git a/proxmox-rrd/src/cache/journal.rs b/proxmox-rrd/src/cache/journal.rs new file mode 100644 index 00000000..7c260e1e --- /dev/null +++ b/proxmox-rrd/src/cache/journal.rs @@ -0,0 +1,200 @@ +use std::ffi::OsStr; +use std::fs::File; +use std::io::{BufReader, Write}; +use std::os::unix::io::AsRawFd; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::{bail, format_err, Error}; +use crossbeam_channel::Receiver; +use nix::fcntl::OFlag; + +use proxmox_sys::fs::atomic_open_or_create_file; + +const RRD_JOURNAL_NAME: &str = "rrd.journal"; + +use crate::cache::CacheConfig; +use crate::rrd::DST; + +// shared state behind RwLock +pub struct JournalState { + config: Arc, + journal: File, + pub last_journal_flush: f64, + pub journal_applied: bool, + pub apply_thread_result: Option>>, +} + +pub struct JournalEntry { + pub time: f64, + pub value: f64, + pub dst: DST, + pub rel_path: String, +} + +impl FromStr for JournalEntry { + type Err = Error; + + fn from_str(line: &str) -> Result { + let line = line.trim(); + + let parts: Vec<&str> = line.splitn(4, ':').collect(); + if parts.len() != 4 { + bail!("wrong numper of components"); + } + + let time: f64 = parts[0] + .parse() + .map_err(|_| format_err!("unable to parse time"))?; + let value: f64 = parts[1] + .parse() + .map_err(|_| format_err!("unable to parse value"))?; + let dst: u8 = parts[2] + .parse() + .map_err(|_| format_err!("unable to parse data source type"))?; + + let dst = match dst { + 0 => DST::Gauge, + 1 => DST::Derive, + _ => bail!("got strange value for data source type '{}'", dst), + }; + + let rel_path = parts[3].to_string(); + + Ok(JournalEntry { + time, + value, + dst, + rel_path, + }) + } +} + +pub struct JournalFileInfo { + pub time: u64, + pub name: String, + pub path: PathBuf, +} + +impl JournalState { + pub(crate) fn new(config: Arc) -> Result { + let journal = JournalState::open_journal_writer(&config)?; + Ok(Self { + config, + journal, + last_journal_flush: 0.0, + journal_applied: false, + apply_thread_result: None, + }) + } + + pub fn sync_journal(&self) -> Result<(), Error> { + nix::unistd::fdatasync(self.journal.as_raw_fd())?; + Ok(()) + } + + pub fn append_journal_entry( + &mut self, + time: f64, + value: f64, + dst: DST, + rel_path: &str, + ) -> Result<(), Error> { + let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); + self.journal.write_all(journal_entry.as_bytes())?; + Ok(()) + } + + pub fn open_journal_reader(&self) -> Result, Error> { + // fixme : dup self.journal instead?? + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + self.config.file_options.clone(), + false, + )?; + Ok(BufReader::new(journal)) + } + + fn open_journal_writer(config: &CacheConfig) -> Result { + let mut journal_path = config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + config.file_options.clone(), + false, + )?; + Ok(journal) + } + + pub fn rotate_journal(&mut self) -> Result<(), Error> { + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let mut new_name = journal_path.clone(); + let now = proxmox_time::epoch_i64(); + new_name.set_extension(format!("journal-{:08x}", now)); + std::fs::rename(journal_path, &new_name)?; + + self.journal = Self::open_journal_writer(&self.config)?; + + // make sure the old journal data landed on the disk + super::fsync_file_and_parent(&new_name)?; + + Ok(()) + } + + pub fn remove_old_journals(&self) -> Result<(), Error> { + let journal_list = self.list_old_journals()?; + + for entry in journal_list { + std::fs::remove_file(entry.path)?; + } + + Ok(()) + } + + pub fn list_old_journals(&self) -> Result, Error> { + let mut list = Vec::new(); + for entry in std::fs::read_dir(&self.config.basedir)? { + let entry = entry?; + let path = entry.path(); + + if !path.is_file() { + continue; + } + + match path.file_stem() { + None => continue, + Some(stem) if stem != OsStr::new("rrd") => continue, + Some(_) => (), + } + + if let Some(extension) = path.extension() { + if let Some(extension) = extension.to_str() { + if let Some(rest) = extension.strip_prefix("journal-") { + if let Ok(time) = u64::from_str_radix(rest, 16) { + list.push(JournalFileInfo { + time, + name: format!("rrd.{}", extension), + path: path.to_owned(), + }); + } + } + } + } + } + list.sort_unstable_by_key(|entry| entry.time); + Ok(list) + } +} diff --git a/proxmox-rrd/src/cache/rrd_map.rs b/proxmox-rrd/src/cache/rrd_map.rs new file mode 100644 index 00000000..f907d350 --- /dev/null +++ b/proxmox-rrd/src/cache/rrd_map.rs @@ -0,0 +1,97 @@ +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, Error}; + +use proxmox_sys::fs::create_path; + +use crate::rrd::{CF, DST, RRD}; + +use super::CacheConfig; +use crate::Entry; + +pub struct RRDMap { + config: Arc, + map: HashMap, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, +} + +impl RRDMap { + pub(crate) fn new( + config: Arc, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, + ) -> Self { + Self { + config, + map: HashMap::new(), + load_rrd_cb, + } + } + + pub fn update( + &mut self, + rel_path: &str, + time: f64, + value: f64, + dst: DST, + new_only: bool, + ) -> Result<(), Error> { + if let Some(rrd) = self.map.get_mut(rel_path) { + if !new_only || time > rrd.last_update() { + rrd.update(time, value); + } + } else { + let mut path = self.config.basedir.clone(); + path.push(rel_path); + create_path( + path.parent().unwrap(), + Some(self.config.dir_options.clone()), + Some(self.config.dir_options.clone()), + )?; + + let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst); + + if !new_only || time > rrd.last_update() { + rrd.update(time, value); + } + self.map.insert(rel_path.to_string(), rrd); + } + Ok(()) + } + + pub fn file_list(&self) -> Vec { + let mut list = Vec::new(); + + for rel_path in self.map.keys() { + list.push(rel_path.clone()); + } + + list + } + + pub fn flush_rrd_file(&self, rel_path: &str) -> Result<(), Error> { + if let Some(rrd) = self.map.get(rel_path) { + let mut path = self.config.basedir.clone(); + path.push(rel_path); + rrd.save(&path, self.config.file_options.clone(), true) + } else { + bail!("rrd file {} not loaded", rel_path); + } + } + + pub fn extract_cached_data( + &self, + base: &str, + name: &str, + cf: CF, + resolution: u64, + start: Option, + end: Option, + ) -> Result, Error> { + match self.map.get(&format!("{}/{}", base, name)) { + Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)), + None => Ok(None), + } + } +} diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs new file mode 100644 index 00000000..80b39438 --- /dev/null +++ b/proxmox-rrd/src/lib.rs @@ -0,0 +1,16 @@ +//! # Round Robin Database files +//! +//! ## Features +//! +//! * One file stores a single data source +//! * Stores data for different time resolution +//! * Simple cache implementation with journal support + +mod rrd_v1; + +pub mod rrd; +#[doc(inline)] +pub use rrd::Entry; + +mod cache; +pub use cache::*; diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs new file mode 100644 index 00000000..0b8ac460 --- /dev/null +++ b/proxmox-rrd/src/rrd.rs @@ -0,0 +1,694 @@ +//! # Proxmox RRD format version 2 +//! +//! The new format uses +//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage +//! format. This way we can use the serde serialization framework, +//! which make our code more flexible, much nicer and type safe. +//! +//! ## Features +//! +//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) +//! * Platform independent (big endian f64, hopefully a standard format?) +//! * Arbitrary number of RRAs (dynamically changeable) + +use std::io::{Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use std::path::Path; + +use anyhow::{bail, format_err, Error}; +use serde::{Deserialize, Serialize}; + +use proxmox_schema::api; +use proxmox_sys::fs::{make_tmp_file, CreateOptions}; + +use crate::rrd_v1; + +/// Proxmox RRD v2 file magic number +// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8]; +pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159]; + +#[api()] +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +/// RRD data source type +pub enum DST { + /// Gauge values are stored unmodified. + Gauge, + /// Stores the difference to the previous value. + Derive, + /// Stores the difference to the previous value (like Derive), but + /// detect counter overflow (and ignores that value) + Counter, +} + +#[api()] +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +/// Consolidation function +pub enum CF { + /// Average + Average, + /// Maximum + Maximum, + /// Minimum + Minimum, + /// Use the last value + Last, +} + +#[derive(Serialize, Deserialize)] +/// Data source specification +pub struct DataSource { + /// Data source type + pub dst: DST, + /// Last update time (epoch) + pub last_update: f64, + /// Stores the last value, used to compute differential value for + /// derive/counters + pub last_value: f64, +} + +/// An RRD entry. +/// +/// Serializes as a tuple. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde( + from = "(u64, u64, Vec>)", + into = "(u64, u64, Vec>)" +)] +pub struct Entry { + pub start: u64, + pub resolution: u64, + pub data: Vec>, +} + +impl Entry { + pub const fn new(start: u64, resolution: u64, data: Vec>) -> Self { + Self { + start, + resolution, + data, + } + } + + /// Get a data point at a specific index which also does bound checking and returns `None` for + /// out of bounds indices. + pub fn get(&self, idx: usize) -> Option { + self.data.get(idx).copied().flatten() + } +} + +impl From for (u64, u64, Vec>) { + fn from(entry: Entry) -> (u64, u64, Vec>) { + (entry.start, entry.resolution, entry.data) + } +} + +impl From<(u64, u64, Vec>)> for Entry { + fn from(data: (u64, u64, Vec>)) -> Self { + Self::new(data.0, data.1, data.2) + } +} + +impl DataSource { + /// Create a new Instance + pub fn new(dst: DST) -> Self { + Self { + dst, + last_update: 0.0, + last_value: f64::NAN, + } + } + + fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result { + if time < 0.0 { + bail!("got negative time"); + } + if time <= self.last_update { + bail!("time in past ({} < {})", time, self.last_update); + } + + if value.is_nan() { + bail!("new value is NAN"); + } + + // derive counter value + let is_counter = self.dst == DST::Counter; + + if is_counter || self.dst == DST::Derive { + let time_diff = time - self.last_update; + + let diff = if self.last_value.is_nan() { + 0.0 + } else if is_counter && value < 0.0 { + bail!("got negative value for counter"); + } else if is_counter && value < self.last_value { + // Note: We do not try automatic overflow corrections, but + // we update last_value anyways, so that we can compute the diff + // next time. + self.last_value = value; + bail!("counter overflow/reset detected"); + } else { + value - self.last_value + }; + self.last_value = value; + value = diff / time_diff; + } else { + self.last_value = value; + } + + Ok(value) + } +} + +#[derive(Serialize, Deserialize)] +/// Round Robin Archive +pub struct RRA { + /// Number of seconds spaned by a single data entry. + pub resolution: u64, + /// Consolitation function. + pub cf: CF, + /// Count values computed inside this update interval. + pub last_count: u64, + /// The actual data entries. + pub data: Vec, +} + +impl RRA { + /// Creates a new instance + pub fn new(cf: CF, resolution: u64, points: usize) -> Self { + Self { + cf, + resolution, + last_count: 0, + data: vec![f64::NAN; points], + } + } + + /// Data slot end time + pub fn slot_end_time(&self, time: u64) -> u64 { + self.resolution * (time / self.resolution + 1) + } + + /// Data slot start time + pub fn slot_start_time(&self, time: u64) -> u64 { + self.resolution * (time / self.resolution) + } + + /// Data slot index + pub fn slot(&self, time: u64) -> usize { + ((time / self.resolution) as usize) % self.data.len() + } + + /// Directly overwrite data slots. + /// + /// The caller need to set `last_update` value on the [DataSource] manually. + pub fn insert_data( + &mut self, + start: u64, + resolution: u64, + data: Vec>, + ) -> Result<(), Error> { + if resolution != self.resolution { + bail!("inser_data failed: got wrong resolution"); + } + + let mut index = self.slot(start); + + for item in data { + if let Some(v) = item { + self.data[index] = v; + } + index += 1; + if index >= self.data.len() { + index = 0; + } + } + Ok(()) + } + + fn delete_old_slots(&mut self, time: f64, last_update: f64) { + let epoch = time as u64; + let last_update = last_update as u64; + let reso = self.resolution; + let num_entries = self.data.len() as u64; + + let min_time = epoch.saturating_sub(num_entries * reso); + let min_time = self.slot_end_time(min_time); + + let mut t = last_update.saturating_sub(num_entries * reso); + let mut index = self.slot(t); + + for _ in 0..num_entries { + t += reso; + index += 1; + if index >= self.data.len() { + index = 0; + } + if t < min_time { + self.data[index] = f64::NAN; + } else { + break; + } + } + } + + fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) { + let epoch = time as u64; + let last_update = last_update as u64; + let reso = self.resolution; + + let index = self.slot(epoch); + let last_index = self.slot(last_update); + + if (epoch - last_update) > reso || index != last_index { + self.last_count = 0; + } + + let last_value = self.data[index]; + if last_value.is_nan() { + self.last_count = 0; + } + + let new_count = self.last_count.saturating_add(1); + + if self.last_count == 0 { + self.data[index] = value; + self.last_count = 1; + } else { + let new_value = match self.cf { + CF::Maximum => { + if last_value > value { + last_value + } else { + value + } + } + CF::Minimum => { + if last_value < value { + last_value + } else { + value + } + } + CF::Last => value, + CF::Average => { + (last_value * (self.last_count as f64)) / (new_count as f64) + + value / (new_count as f64) + } + }; + self.data[index] = new_value; + self.last_count = new_count; + } + } + + /// Extract data + /// + /// Extract data from `start` to `end`. The RRA itself does not + /// store the `last_update` time, so you need to pass this a + /// parameter (see [DataSource]). + pub fn extract_data(&self, start: u64, end: u64, last_update: f64) -> Entry { + let last_update = last_update as u64; + let reso = self.resolution; + let num_entries = self.data.len() as u64; + + let mut list = Vec::new(); + + let rrd_end = self.slot_end_time(last_update); + let rrd_start = rrd_end.saturating_sub(reso * num_entries); + + let mut t = start; + let mut index = self.slot(t); + for _ in 0..num_entries { + if t > end { + break; + }; + if t < rrd_start || t >= rrd_end { + list.push(None); + } else { + let value = self.data[index]; + if value.is_nan() { + list.push(None); + } else { + list.push(Some(value)); + } + } + t += reso; + index += 1; + if index >= self.data.len() { + index = 0; + } + } + + Entry::new(start, reso, list) + } +} + +#[derive(Serialize, Deserialize)] +/// Round Robin Database +pub struct RRD { + /// The data source definition + pub source: DataSource, + /// List of round robin archives + pub rra_list: Vec, +} + +impl RRD { + /// Creates a new Instance + pub fn new(dst: DST, rra_list: Vec) -> RRD { + let source = DataSource::new(dst); + + RRD { source, rra_list } + } + + fn from_raw(raw: &[u8]) -> Result { + if raw.len() < 8 { + bail!("not an rrd file - file is too small ({})", raw.len()); + } + + let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 { + let v1 = rrd_v1::RRDv1::from_raw(raw)?; + v1.to_rrd_v2() + .map_err(|err| format_err!("unable to convert from old V1 format - {}", err))? + } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 { + serde_cbor::from_slice(&raw[8..]) + .map_err(|err| format_err!("unable to decode RRD file - {}", err))? + } else { + bail!("not an rrd file - unknown magic number"); + }; + + if rrd.source.last_update < 0.0 { + bail!("rrd file has negative last_update time"); + } + + Ok(rrd) + } + + /// Load data from a file + /// + /// Setting `avoid_page_cache` uses + /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in + /// the linux page cache. + pub fn load(path: &Path, avoid_page_cache: bool) -> Result { + let mut file = std::fs::File::open(path)?; + let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0); + let mut raw = Vec::with_capacity(buffer_size); + file.read_to_end(&mut raw)?; + + if avoid_page_cache { + nix::fcntl::posix_fadvise( + file.as_raw_fd(), + 0, + buffer_size as i64, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, + ) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?; + } + + match Self::from_raw(&raw) { + Ok(rrd) => Ok(rrd), + Err(err) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + )), + } + } + + /// Store data into a file (atomic replace file) + /// + /// Setting `avoid_page_cache` uses + /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in + /// the linux page cache. + pub fn save( + &self, + path: &Path, + options: CreateOptions, + avoid_page_cache: bool, + ) -> Result<(), Error> { + let (fd, tmp_path) = make_tmp_file(path, options)?; + let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; + + let mut try_block = || -> Result<(), Error> { + let mut data: Vec = Vec::new(); + data.extend(PROXMOX_RRD_MAGIC_2_0); + serde_cbor::to_writer(&mut data, self)?; + file.write_all(&data)?; + + if avoid_page_cache { + nix::fcntl::posix_fadvise( + file.as_raw_fd(), + 0, + data.len() as i64, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, + )?; + } + + Ok(()) + }; + + match try_block() { + Ok(()) => (), + error => { + let _ = nix::unistd::unlink(&tmp_path); + return error; + } + } + + if let Err(err) = std::fs::rename(&tmp_path, path) { + let _ = nix::unistd::unlink(&tmp_path); + bail!("Atomic rename failed - {}", err); + } + + Ok(()) + } + + /// Returns the last update time. + pub fn last_update(&self) -> f64 { + self.source.last_update + } + + /// Update the value (in memory) + /// + /// Note: This does not call [Self::save]. + pub fn update(&mut self, time: f64, value: f64) { + let value = match self.source.compute_new_value(time, value) { + Ok(value) => value, + Err(err) => { + log::error!("rrd update failed: {}", err); + return; + } + }; + + let last_update = self.source.last_update; + self.source.last_update = time; + + for rra in self.rra_list.iter_mut() { + rra.delete_old_slots(time, last_update); + rra.compute_new_value(time, last_update, value); + } + } + + /// Extract data from the archive + /// + /// This selects the RRA with specified [CF] and (minimum) + /// resolution, and extract data from `start` to `end`. + /// + /// `start`: Start time. If not specified, we simply extract 10 data points. + /// `end`: End time. Default is to use the current time. + pub fn extract_data( + &self, + cf: CF, + resolution: u64, + start: Option, + end: Option, + ) -> Result { + let mut rra: Option<&RRA> = None; + for item in self.rra_list.iter() { + if item.cf != cf { + continue; + } + if item.resolution > resolution { + continue; + } + + if let Some(current) = rra { + if item.resolution > current.resolution { + rra = Some(item); + } + } else { + rra = Some(item); + } + } + + match rra { + Some(rra) => { + let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64); + let start = start.unwrap_or_else(|| end.saturating_sub(10 * rra.resolution)); + Ok(rra.extract_data(start, end, self.source.last_update)) + } + None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic_rra_maximum_gauge_test() -> Result<(), Error> { + let rra = RRA::new(CF::Maximum, 60, 5); + let mut rrd = RRD::new(DST::Gauge, vec![rra]); + + for i in 2..10 { + rrd.update((i as f64) * 30.0, i as f64); + } + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5 * 60))?; + assert_eq!(start, 0); + assert_eq!(resolution, 60); + assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]); + + Ok(()) + } + + #[test] + fn basic_rra_minimum_gauge_test() -> Result<(), Error> { + let rra = RRA::new(CF::Minimum, 60, 5); + let mut rrd = RRD::new(DST::Gauge, vec![rra]); + + for i in 2..10 { + rrd.update((i as f64) * 30.0, i as f64); + } + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5 * 60))?; + assert_eq!(start, 0); + assert_eq!(resolution, 60); + assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]); + + Ok(()) + } + + #[test] + fn basic_rra_last_gauge_test() -> Result<(), Error> { + let rra = RRA::new(CF::Last, 60, 5); + let mut rrd = RRD::new(DST::Gauge, vec![rra]); + + for i in 2..10 { + rrd.update((i as f64) * 30.0, i as f64); + } + + assert!( + rrd.extract_data(CF::Average, 60, Some(0), Some(5 * 60)) + .is_err(), + "CF::Average should not exist" + ); + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?; + assert_eq!(start, 0); + assert_eq!(resolution, 60); + assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]); + + Ok(()) + } + + #[test] + fn basic_rra_average_derive_test() -> Result<(), Error> { + let rra = RRA::new(CF::Average, 60, 5); + let mut rrd = RRD::new(DST::Derive, vec![rra]); + + for i in 2..10 { + rrd.update((i as f64) * 30.0, (i * 60) as f64); + } + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; + assert_eq!(start, 60); + assert_eq!(resolution, 60); + assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]); + + Ok(()) + } + + #[test] + fn basic_rra_average_gauge_test() -> Result<(), Error> { + let rra = RRA::new(CF::Average, 60, 5); + let mut rrd = RRD::new(DST::Gauge, vec![rra]); + + for i in 2..10 { + rrd.update((i as f64) * 30.0, i as f64); + } + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; + assert_eq!(start, 60); + assert_eq!(resolution, 60); + assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]); + + for i in 10..14 { + rrd.update((i as f64) * 30.0, i as f64); + } + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; + assert_eq!(start, 60); + assert_eq!(resolution, 60); + assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]); + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(3 * 60), Some(8 * 60))?; + assert_eq!(start, 3 * 60); + assert_eq!(resolution, 60); + assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]); + + // add much newer value (should delete all previous/outdated value) + let i = 100; + rrd.update((i as f64) * 30.0, i as f64); + println!("TEST {:?}", serde_json::to_string_pretty(&rrd)); + + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(100 * 30 + 5 * 60))?; + assert_eq!(start, 100 * 30); + assert_eq!(resolution, 60); + assert_eq!(data, [Some(100.0), None, None, None, None]); + + // extract with end time smaller than start time + let Entry { + start, + resolution, + data, + } = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(60))?; + assert_eq!(start, 100 * 30); + assert_eq!(resolution, 60); + assert_eq!(data, []); + + Ok(()) + } +} diff --git a/proxmox-rrd/src/rrd_v1.rs b/proxmox-rrd/src/rrd_v1.rs new file mode 100644 index 00000000..2f4a25f8 --- /dev/null +++ b/proxmox-rrd/src/rrd_v1.rs @@ -0,0 +1,295 @@ +use std::io::Read; + +use anyhow::Error; +use bitflags::bitflags; + +/// The number of data entries per RRA +pub const RRD_DATA_ENTRIES: usize = 70; + +/// Proxmox RRD file magic number +// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8]; +pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; + +use crate::rrd::{DataSource, CF, DST, RRA, RRD}; + +bitflags! { + /// Flags to specify the data source type and consolidation function + pub struct RRAFlags: u64 { + // Data Source Types + const DST_GAUGE = 1; + const DST_DERIVE = 2; + const DST_COUNTER = 4; + const DST_MASK = 255; // first 8 bits + + // Consolidation Functions + const CF_AVERAGE = 1 << 8; + const CF_MAX = 2 << 8; + const CF_MASK = 255 << 8; + } +} + +/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots. +/// +/// This data structure is used inside [RRD] and directly written to the +/// RRD files. +#[repr(C)] +pub struct RRAv1 { + /// Defined the data source type and consolidation function + pub flags: RRAFlags, + /// Resolution (seconds) + pub resolution: u64, + /// Last update time (epoch) + pub last_update: f64, + /// Count values computed inside this update interval + pub last_count: u64, + /// Stores the last value, used to compute differential value for derive/counters + pub counter_value: f64, + /// Data slots + pub data: [f64; RRD_DATA_ENTRIES], +} + +impl RRAv1 { + fn extract_data(&self) -> (u64, u64, Vec>) { + let reso = self.resolution; + + let mut list = Vec::new(); + + let rra_end = reso * ((self.last_update as u64) / reso); + let rra_start = rra_end - reso * (RRD_DATA_ENTRIES as u64); + + let mut t = rra_start; + let mut index = ((t / reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + let value = self.data[index]; + if value.is_nan() { + list.push(None); + } else { + list.push(Some(value)); + } + + t += reso; + index = (index + 1) % RRD_DATA_ENTRIES; + } + + (rra_start, reso, list) + } +} + +/// Round Robin Database file format with fixed number of [RRA]s +#[repr(C)] +// Note: Avoid alignment problems by using 8byte types only +pub struct RRDv1 { + /// The magic number to identify the file type + pub magic: [u8; 8], + /// Hourly data (average values) + pub hour_avg: RRAv1, + /// Hourly data (maximum values) + pub hour_max: RRAv1, + /// Dayly data (average values) + pub day_avg: RRAv1, + /// Dayly data (maximum values) + pub day_max: RRAv1, + /// Weekly data (average values) + pub week_avg: RRAv1, + /// Weekly data (maximum values) + pub week_max: RRAv1, + /// Monthly data (average values) + pub month_avg: RRAv1, + /// Monthly data (maximum values) + pub month_max: RRAv1, + /// Yearly data (average values) + pub year_avg: RRAv1, + /// Yearly data (maximum values) + pub year_max: RRAv1, +} + +impl RRDv1 { + pub fn from_raw(mut raw: &[u8]) -> Result { + let expected_len = std::mem::size_of::(); + + if raw.len() != expected_len { + let msg = format!("wrong data size ({} != {})", raw.len(), expected_len); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + let mut rrd: RRDv1 = unsafe { std::mem::zeroed() }; + unsafe { + let rrd_slice = + std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); + raw.read_exact(rrd_slice)?; + } + + if rrd.magic != PROXMOX_RRD_MAGIC_1_0 { + let msg = "wrong magic number".to_string(); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + Ok(rrd) + } + + pub fn to_rrd_v2(&self) -> Result { + let mut rra_list = Vec::new(); + + // old format v1: + // + // hour 1 min, 70 points + // day 30 min, 70 points + // week 3 hours, 70 points + // month 12 hours, 70 points + // year 1 week, 70 points + // + // new default for RRD v2: + // + // day 1 min, 1440 points + // month 30 min, 1440 points + // year 365 min (6h), 1440 points + // decade 1 week, 570 points + + // Linear extrapolation + fn extrapolate_data( + start: u64, + reso: u64, + factor: u64, + data: Vec>, + ) -> (u64, u64, Vec>) { + let mut new = Vec::new(); + + for i in 0..data.len() { + let mut next = i + 1; + if next >= data.len() { + next = 0 + }; + let v = data[i]; + let v1 = data[next]; + match (v, v1) { + (Some(v), Some(v1)) => { + let diff = (v1 - v) / (factor as f64); + for j in 0..factor { + new.push(Some(v + diff * (j as f64))); + } + } + (Some(v), None) => { + new.push(Some(v)); + for _ in 0..factor - 1 { + new.push(None); + } + } + (None, Some(v1)) => { + for _ in 0..factor - 1 { + new.push(None); + } + new.push(Some(v1)); + } + (None, None) => { + for _ in 0..factor { + new.push(None); + } + } + } + } + + (start, reso / factor, new) + } + + // Try to convert to new, higher capacity format + + // compute daily average (merge old self.day_avg and self.hour_avg + let mut day_avg = RRA::new(CF::Average, 60, 1440); + + let (start, reso, data) = self.day_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 30, data); + day_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.hour_avg.extract_data(); + day_avg.insert_data(start, reso, data)?; + + // compute daily maximum (merge old self.day_max and self.hour_max + let mut day_max = RRA::new(CF::Maximum, 60, 1440); + + let (start, reso, data) = self.day_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 30, data); + day_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.hour_max.extract_data(); + day_max.insert_data(start, reso, data)?; + + // compute monthly average (merge old self.month_avg, + // self.week_avg and self.day_avg) + let mut month_avg = RRA::new(CF::Average, 30 * 60, 1440); + + let (start, reso, data) = self.month_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 24, data); + month_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.week_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 6, data); + month_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.day_avg.extract_data(); + month_avg.insert_data(start, reso, data)?; + + // compute monthly maximum (merge old self.month_max, + // self.week_max and self.day_max) + let mut month_max = RRA::new(CF::Maximum, 30 * 60, 1440); + + let (start, reso, data) = self.month_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 24, data); + month_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.week_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 6, data); + month_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.day_max.extract_data(); + month_max.insert_data(start, reso, data)?; + + // compute yearly average (merge old self.year_avg) + let mut year_avg = RRA::new(CF::Average, 6 * 3600, 1440); + + let (start, reso, data) = self.year_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 28, data); + year_avg.insert_data(start, reso, data)?; + + // compute yearly maximum (merge old self.year_avg) + let mut year_max = RRA::new(CF::Maximum, 6 * 3600, 1440); + + let (start, reso, data) = self.year_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 28, data); + year_max.insert_data(start, reso, data)?; + + // compute decade average (merge old self.year_avg) + let mut decade_avg = RRA::new(CF::Average, 7 * 86400, 570); + let (start, reso, data) = self.year_avg.extract_data(); + decade_avg.insert_data(start, reso, data)?; + + // compute decade maximum (merge old self.year_max) + let mut decade_max = RRA::new(CF::Maximum, 7 * 86400, 570); + let (start, reso, data) = self.year_max.extract_data(); + decade_max.insert_data(start, reso, data)?; + + rra_list.push(day_avg); + rra_list.push(day_max); + rra_list.push(month_avg); + rra_list.push(month_max); + rra_list.push(year_avg); + rra_list.push(year_max); + rra_list.push(decade_avg); + rra_list.push(decade_max); + + // use values from hour_avg for source (all RRAv1 must have the same config) + let dst = if self.hour_avg.flags.contains(RRAFlags::DST_COUNTER) { + DST::Counter + } else if self.hour_avg.flags.contains(RRAFlags::DST_DERIVE) { + DST::Derive + } else { + DST::Gauge + }; + + let source = DataSource { + dst, + last_value: f64::NAN, + last_update: self.hour_avg.last_update, // IMPORTANT! + }; + Ok(RRD { source, rra_list }) + } +} diff --git a/proxmox-rrd/tests/file_format_test.rs b/proxmox-rrd/tests/file_format_test.rs new file mode 100644 index 00000000..372a4077 --- /dev/null +++ b/proxmox-rrd/tests/file_format_test.rs @@ -0,0 +1,56 @@ +use std::path::Path; +use std::process::Command; + +use anyhow::{bail, Error}; + +use proxmox_rrd::rrd::RRD; +use proxmox_sys::fs::CreateOptions; + +fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> { + let status = Command::new("/usr/bin/cmp") + .arg(fn1) + .arg(fn2) + .status() + .expect("failed to execute process"); + + if !status.success() { + bail!("file compare failed"); + } + + Ok(()) +} + +const RRD_V1_FN: &str = "./tests/testdata/cpu.rrd_v1"; +const RRD_V2_FN: &str = "./tests/testdata/cpu.rrd_v2"; + +// make sure we can load and convert RRD v1 +#[test] +fn upgrade_from_rrd_v1() -> Result<(), Error> { + let rrd = RRD::load(Path::new(RRD_V1_FN), true)?; + + const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded"; + let new_path = Path::new(RRD_V2_NEW_FN); + rrd.save(new_path, CreateOptions::new(), true)?; + + let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN); + let _ = std::fs::remove_file(RRD_V2_NEW_FN); + result?; + + Ok(()) +} + +// make sure we can load and save RRD v2 +#[test] +fn load_and_save_rrd_v2() -> Result<(), Error> { + let rrd = RRD::load(Path::new(RRD_V2_FN), true)?; + + const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved"; + let new_path = Path::new(RRD_V2_NEW_FN); + rrd.save(new_path, CreateOptions::new(), true)?; + + let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN); + let _ = std::fs::remove_file(RRD_V2_NEW_FN); + result?; + + Ok(()) +} diff --git a/proxmox-rrd/tests/testdata/cpu.rrd_v1 b/proxmox-rrd/tests/testdata/cpu.rrd_v1 new file mode 100644 index 00000000..99d43d34 Binary files /dev/null and b/proxmox-rrd/tests/testdata/cpu.rrd_v1 differ diff --git a/proxmox-rrd/tests/testdata/cpu.rrd_v2 b/proxmox-rrd/tests/testdata/cpu.rrd_v2 new file mode 100644 index 00000000..5e4dfc77 Binary files /dev/null and b/proxmox-rrd/tests/testdata/cpu.rrd_v2 differ