diff --git a/proxmox-rrd/examples/prrd.rs b/proxmox-rrd/examples/prrd.rs index 5fa120e4..081cef98 100644 --- a/proxmox-rrd/examples/prrd.rs +++ b/proxmox-rrd/examples/prrd.rs @@ -3,24 +3,22 @@ use std::path::PathBuf; use anyhow::{bail, Error}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use serde_json::json; +use proxmox_router::cli::{ + complete_file_name, run_cli_command, CliCommand, CliCommandMap, CliEnvironment, +}; use proxmox_router::RpcEnvironment; -use proxmox_router::cli::{run_cli_command, complete_file_name, CliCommand, CliCommandMap, CliEnvironment}; use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema}; use proxmox_sys::fs::CreateOptions; use proxmox_rrd::rrd::{CF, DST, RRA, RRD}; -pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new( - "Index of the RRA.") - .minimum(0) - .schema(); +pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new("Index of the RRA.").minimum(0).schema(); -pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new( - "RRA configuration") +pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new("RRA configuration") .format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA)) .schema(); @@ -49,7 +47,6 @@ pub struct RRAConfig { )] /// Dump the RRD file in JSON format pub fn dump_rrd(path: String) -> Result<(), Error> { - let rrd = RRD::load(&PathBuf::from(path), false)?; serde_json::to_writer_pretty(std::io::stdout(), &rrd)?; println!(); @@ -67,14 +64,19 @@ pub fn dump_rrd(path: String) -> Result<(), Error> { )] /// RRD file information pub fn rrd_info(path: String) -> Result<(), Error> { - let rrd = RRD::load(&PathBuf::from(path), false)?; println!("DST: {:?}", rrd.source.dst); for (i, rra) in rrd.rra_list.iter().enumerate() { // use RRAConfig property string format - println!("RRA[{}]: {:?},r={},n={}", i, rra.cf, rra.resolution, rra.data.len()); + println!( + "RRA[{}]: {:?},r={},n={}", + i, + rra.cf, + rra.resolution, + rra.data.len() + ); } Ok(()) @@ -97,15 +99,11 @@ pub fn rrd_info(path: String) -> Result<(), Error> { }, )] /// Update the RRD database -pub fn update_rrd( - path: String, - time: Option, - value: f64, -) -> Result<(), Error> { - +pub fn update_rrd(path: String, time: Option, value: f64) -> Result<(), Error> { let path = PathBuf::from(path); - let time = time.map(|v| v as f64) + let time = time + .map(|v| v as f64) .unwrap_or_else(proxmox_time::epoch_f64); let mut rrd = RRD::load(&path, false)?; @@ -147,7 +145,6 @@ pub fn fetch_rrd( start: Option, end: Option, ) -> Result<(), Error> { - let rrd = RRD::load(&PathBuf::from(path), false)?; let data = rrd.extract_data(cf, resolution, start, end)?; @@ -171,18 +168,14 @@ pub fn fetch_rrd( )] /// Return the Unix timestamp of the first time slot inside the /// specified RRA (slot start time) -pub fn first_update_time( - path: String, - rra_index: usize, -) -> Result<(), Error> { - +pub fn first_update_time(path: String, rra_index: usize) -> Result<(), Error> { let rrd = RRD::load(&PathBuf::from(path), false)?; if rra_index >= rrd.rra_list.len() { bail!("rra-index is out of range"); } let rra = &rrd.rra_list[rra_index]; - let duration = (rra.data.len() as u64)*rra.resolution; + let duration = (rra.data.len() as u64) * rra.resolution; let first = rra.slot_start_time((rrd.source.last_update as u64).saturating_sub(duration)); println!("{}", first); @@ -200,7 +193,6 @@ pub fn first_update_time( )] /// Return the Unix timestamp of the last update pub fn last_update_time(path: String) -> Result<(), Error> { - let rrd = RRD::load(&PathBuf::from(path), false)?; println!("{}", rrd.source.last_update); @@ -218,7 +210,6 @@ pub fn last_update_time(path: String) -> Result<(), Error> { )] /// Return the time and value from the last update pub fn last_update(path: String) -> Result<(), Error> { - let rrd = RRD::load(&PathBuf::from(path), false)?; let result = json!({ @@ -251,18 +242,12 @@ pub fn last_update(path: String) -> Result<(), Error> { }, )] /// Create a new RRD file -pub fn create_rrd( - dst: DST, - path: String, - rra: Vec, -) -> Result<(), Error> { - +pub fn create_rrd(dst: DST, path: String, rra: Vec) -> Result<(), Error> { let mut rra_list = Vec::new(); for item in rra.iter() { - let rra: RRAConfig = serde_json::from_value( - RRAConfig::API_SCHEMA.parse_property_string(item)? - )?; + let rra: RRAConfig = + serde_json::from_value(RRAConfig::API_SCHEMA.parse_property_string(item)?)?; println!("GOT {:?}", rra); rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize)); } @@ -293,12 +278,7 @@ pub fn create_rrd( }, )] /// Resize. Change the number of data slots for the specified RRA. -pub fn resize_rrd( - path: String, - rra_index: usize, - slots: i64, -) -> Result<(), Error> { - +pub fn resize_rrd(path: String, rra_index: usize, slots: i64) -> Result<(), Error> { let path = PathBuf::from(&path); let mut rrd = RRD::load(&path, false)?; @@ -315,12 +295,12 @@ pub fn resize_rrd( bail!("numer of new slots is too small ('{}' < 1)", new_slots); } - if new_slots > 1024*1024 { + if new_slots > 1024 * 1024 { bail!("numer of new slots is too big ('{}' > 1M)", new_slots); } let rra_end = rra.slot_end_time(rrd.source.last_update as u64); - let rra_start = rra_end - rra.resolution*(rra.data.len() as u64); + let rra_start = rra_end - rra.resolution * (rra.data.len() as u64); let (start, reso, data) = rra.extract_data(rra_start, rra_end, rrd.source.last_update); let mut new_rra = RRA::new(rra.cf, rra.resolution, new_slots as usize); @@ -336,7 +316,6 @@ pub fn resize_rrd( } fn main() -> Result<(), Error> { - let uid = nix::unistd::Uid::current(); let username = match nix::unistd::User::from_uid(uid)? { @@ -349,57 +328,56 @@ fn main() -> Result<(), Error> { "create", CliCommand::new(&API_METHOD_CREATE_RRD) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "dump", CliCommand::new(&API_METHOD_DUMP_RRD) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) - ) + .completion_cb("path", complete_file_name), + ) .insert( "fetch", CliCommand::new(&API_METHOD_FETCH_RRD) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) - ) + .completion_cb("path", complete_file_name), + ) .insert( "first", CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "info", CliCommand::new(&API_METHOD_RRD_INFO) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "last", CliCommand::new(&API_METHOD_LAST_UPDATE_TIME) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "lastupdate", CliCommand::new(&API_METHOD_LAST_UPDATE) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "resize", CliCommand::new(&API_METHOD_RESIZE_RRD) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) + .completion_cb("path", complete_file_name), ) .insert( "update", CliCommand::new(&API_METHOD_UPDATE_RRD) .arg_param(&["path"]) - .completion_cb("path", complete_file_name) - ) - ; + .completion_cb("path", complete_file_name), + ); let mut rpcenv = CliEnvironment::new(); rpcenv.set_auth_id(Some(format!("{}@pam", username))); @@ -407,5 +385,4 @@ fn main() -> Result<(), Error> { run_cli_command(cmd_def, rpcenv, None); Ok(()) - } diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index ec90cd85..90e4e470 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -1,18 +1,18 @@ +use std::collections::BTreeSet; use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use std::io::{BufRead, BufReader}; -use std::time::SystemTime; use std::thread::spawn; -use std::os::unix::io::AsRawFd; -use std::collections::BTreeSet; +use std::time::SystemTime; +use anyhow::{bail, format_err, Error}; use crossbeam_channel::{bounded, TryRecvError}; -use anyhow::{format_err, bail, Error}; use proxmox_sys::fs::{create_path, CreateOptions}; -use crate::rrd::{DST, CF, RRD, RRA}; +use crate::rrd::{CF, DST, RRA, RRD}; mod journal; use journal::*; @@ -37,9 +37,7 @@ pub(crate) struct CacheConfig { dir_options: CreateOptions, } - impl RRDCache { - /// Creates a new instance /// /// `basedir`: All files are stored relative to this path. @@ -66,8 +64,12 @@ impl RRDCache { let file_options = file_options.unwrap_or_else(CreateOptions::new); let dir_options = dir_options.unwrap_or_else(CreateOptions::new); - create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) - .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; + create_path( + &basedir, + Some(dir_options.clone()), + Some(dir_options.clone()), + ) + .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; let config = Arc::new(CacheConfig { basedir, @@ -130,7 +132,6 @@ impl RRDCache { let state = Arc::clone(&self.state); let rrd_map = Arc::clone(&self.rrd_map); - let mut state_guard = self.state.write().unwrap(); let journal_applied = state_guard.journal_applied; @@ -160,7 +161,9 @@ impl RRDCache { let now = proxmox_time::epoch_f64(); let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; - if journal_applied && !wants_commit { return Ok(journal_applied); } + if journal_applied && !wants_commit { + return Ok(journal_applied); + } state_guard.last_journal_flush = proxmox_time::epoch_f64(); @@ -176,7 +179,6 @@ impl RRDCache { Ok(journal_applied) } - /// Update data in RAM and write file back to disk (journal) pub fn update_value( &self, @@ -185,14 +187,18 @@ impl RRDCache { value: f64, dst: DST, ) -> Result<(), Error> { - let journal_applied = self.apply_journal()?; - self.state.write().unwrap() + self.state + .write() + .unwrap() .append_journal_entry(time, value, dst, rel_path)?; if journal_applied { - self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; + self.rrd_map + .write() + .unwrap() + .update(rel_path, time, value, dst, false)?; } Ok(()) @@ -212,19 +218,19 @@ impl RRDCache { start: Option, end: Option, ) -> Result>)>, Error> { - self.rrd_map.read().unwrap() + self.rrd_map + .read() + .unwrap() .extract_cached_data(base, name, cf, resolution, start, end) } } - fn apply_and_commit_journal_thread( config: Arc, state: Arc>, rrd_map: Arc>, commit_only: bool, ) -> Result<(), Error> { - if commit_only { state.write().unwrap().rotate_journal()?; // start new journal, keep old one } else { @@ -234,7 +240,11 @@ fn apply_and_commit_journal_thread( match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { Ok(entries) => { let elapsed = start_time.elapsed().unwrap().as_secs_f64(); - log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); + log::info!( + "applied rrd journal ({} entries in {:.3} seconds)", + entries, + elapsed + ); } Err(err) => bail!("apply rrd journal failed - {}", err), } @@ -246,8 +256,11 @@ fn apply_and_commit_journal_thread( match commit_journal_impl(config, state, rrd_map) { Ok(rrd_file_count) => { let elapsed = start_time.elapsed().unwrap().as_secs_f64(); - log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", - rrd_file_count, elapsed); + log::info!( + "rrd journal successfully committed ({} files in {:.3} seconds)", + rrd_file_count, + elapsed + ); } Err(err) => bail!("rrd journal commit failed: {}", err), } @@ -261,7 +274,6 @@ fn apply_journal_lines( reader: &mut BufReader, lock_read_line: bool, ) -> Result { - let mut linenr = 0; loop { @@ -274,20 +286,30 @@ fn apply_journal_lines( reader.read_line(&mut line)? }; - if len == 0 { break; } + if len == 0 { + break; + } let entry: JournalEntry = match line.parse() { Ok(entry) => entry, Err(err) => { log::warn!( "unable to parse rrd journal '{}' line {} (skip) - {}", - journal_name, linenr, err, + journal_name, + linenr, + err, ); continue; // skip unparsable lines } }; - rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; + rrd_map.write().unwrap().update( + &entry.rel_path, + entry.time, + entry.value, + entry.dst, + true, + )?; } Ok(linenr) } @@ -296,7 +318,6 @@ fn apply_journal_impl( state: Arc>, rrd_map: Arc>, ) -> Result { - let mut lines = 0; // Apply old journals first @@ -343,7 +364,6 @@ fn apply_journal_impl( state_guard.journal_applied = true; } - Ok(lines) } @@ -353,7 +373,7 @@ fn fsync_file_or_dir(path: &Path) -> Result<(), Error> { Ok(()) } -pub(crate)fn fsync_file_and_parent(path: &Path) -> Result<(), Error> { +pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> { let file = std::fs::File::open(path)?; nix::unistd::fsync(file.as_raw_fd())?; if let Some(parent) = path.parent() { @@ -376,7 +396,6 @@ fn commit_journal_impl( state: Arc>, rrd_map: Arc>, ) -> Result { - let files = rrd_map.read().unwrap().file_list(); let mut rrd_file_count = 0; diff --git a/proxmox-rrd/src/cache/journal.rs b/proxmox-rrd/src/cache/journal.rs index fbc8773c..7c260e1e 100644 --- a/proxmox-rrd/src/cache/journal.rs +++ b/proxmox-rrd/src/cache/journal.rs @@ -1,21 +1,21 @@ -use std::fs::File; -use std::path::PathBuf; -use std::sync::Arc; -use std::io::{Write, BufReader}; use std::ffi::OsStr; +use std::fs::File; +use std::io::{BufReader, Write}; use std::os::unix::io::AsRawFd; +use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use anyhow::{bail, format_err, Error}; -use nix::fcntl::OFlag; use crossbeam_channel::Receiver; +use nix::fcntl::OFlag; use proxmox_sys::fs::atomic_open_or_create_file; const RRD_JOURNAL_NAME: &str = "rrd.journal"; -use crate::rrd::DST; use crate::cache::CacheConfig; +use crate::rrd::DST; // shared state behind RwLock pub struct JournalState { @@ -36,20 +36,22 @@ pub struct JournalEntry { impl FromStr for JournalEntry { type Err = Error; - fn from_str(line: &str) -> Result { - - let line = line.trim(); + fn from_str(line: &str) -> Result { + let line = line.trim(); let parts: Vec<&str> = line.splitn(4, ':').collect(); if parts.len() != 4 { bail!("wrong numper of components"); } - let time: f64 = parts[0].parse() + let time: f64 = parts[0] + .parse() .map_err(|_| format_err!("unable to parse time"))?; - let value: f64 = parts[1].parse() + let value: f64 = parts[1] + .parse() .map_err(|_| format_err!("unable to parse value"))?; - let dst: u8 = parts[2].parse() + let dst: u8 = parts[2] + .parse() .map_err(|_| format_err!("unable to parse data source type"))?; let dst = match dst { @@ -60,8 +62,13 @@ impl FromStr for JournalEntry { let rel_path = parts[3].to_string(); - Ok(JournalEntry { time, value, dst, rel_path }) - } + Ok(JournalEntry { + time, + value, + dst, + rel_path, + }) + } } pub struct JournalFileInfo { @@ -71,7 +78,6 @@ pub struct JournalFileInfo { } impl JournalState { - pub(crate) fn new(config: Arc) -> Result { let journal = JournalState::open_journal_writer(&config)?; Ok(Self { @@ -95,19 +101,17 @@ impl JournalState { dst: DST, rel_path: &str, ) -> Result<(), Error> { - let journal_entry = format!( - "{}:{}:{}:{}\n", time, value, dst as u8, rel_path); + let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); self.journal.write_all(journal_entry.as_bytes())?; Ok(()) } pub fn open_journal_reader(&self) -> Result, Error> { - // fixme : dup self.journal instead?? let mut journal_path = self.config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); - let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; + let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY; let journal = atomic_open_or_create_file( &journal_path, flags, @@ -122,7 +126,7 @@ impl JournalState { let mut journal_path = config.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); - let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; + let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND; let journal = atomic_open_or_create_file( &journal_path, flags, @@ -151,7 +155,6 @@ impl JournalState { } pub fn remove_old_journals(&self) -> Result<(), Error> { - let journal_list = self.list_old_journals()?; for entry in journal_list { @@ -167,7 +170,9 @@ impl JournalState { let entry = entry?; let path = entry.path(); - if !path.is_file() { continue; } + if !path.is_file() { + continue; + } match path.file_stem() { None => continue, diff --git a/proxmox-rrd/src/cache/rrd_map.rs b/proxmox-rrd/src/cache/rrd_map.rs index 6577fb2e..56dde2e6 100644 --- a/proxmox-rrd/src/cache/rrd_map.rs +++ b/proxmox-rrd/src/cache/rrd_map.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; -use std::collections::HashMap; use anyhow::{bail, Error}; @@ -17,7 +17,6 @@ pub struct RRDMap { } impl RRDMap { - pub(crate) fn new( config: Arc, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, @@ -71,7 +70,7 @@ impl RRDMap { } pub fn flush_rrd_file(&self, rel_path: &str) -> Result<(), Error> { - if let Some(rrd) = self.map.get(rel_path) { + if let Some(rrd) = self.map.get(rel_path) { let mut path = self.config.basedir.clone(); path.push(rel_path); rrd.save(&path, self.config.file_options.clone(), true) diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs index 1d5d665f..41af6242 100644 --- a/proxmox-rrd/src/rrd.rs +++ b/proxmox-rrd/src/rrd.rs @@ -11,15 +11,15 @@ //! * Plattform independent (big endian f64, hopefully a standard format?) //! * Arbitrary number of RRAs (dynamically changeable) -use std::path::Path; use std::io::{Read, Write}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use std::path::Path; use anyhow::{bail, format_err, Error}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; -use proxmox_sys::fs::{make_tmp_file, CreateOptions}; use proxmox_schema::api; +use proxmox_sys::fs::{make_tmp_file, CreateOptions}; use crate::rrd_v1; @@ -69,7 +69,6 @@ pub struct DataSource { } impl DataSource { - /// Create a new Instance pub fn new(dst: DST) -> Self { Self { @@ -111,15 +110,13 @@ impl DataSource { value - self.last_value }; self.last_value = value; - value = diff/time_diff; + value = diff / time_diff; } else { self.last_value = value; } Ok(value) } - - } #[derive(Serialize, Deserialize)] @@ -136,7 +133,6 @@ pub struct RRA { } impl RRA { - /// Creates a new instance pub fn new(cf: CF, resolution: u64, points: usize) -> Self { Self { @@ -181,7 +177,10 @@ impl RRA { if let Some(v) = item { self.data[index] = v; } - index += 1; if index >= self.data.len() { index = 0; } + index += 1; + if index >= self.data.len() { + index = 0; + } } Ok(()) } @@ -192,15 +191,18 @@ impl RRA { let reso = self.resolution; let num_entries = self.data.len() as u64; - let min_time = epoch.saturating_sub(num_entries*reso); + let min_time = epoch.saturating_sub(num_entries * reso); let min_time = self.slot_end_time(min_time); - let mut t = last_update.saturating_sub(num_entries*reso); + let mut t = last_update.saturating_sub(num_entries * reso); let mut index = self.slot(t); for _ in 0..num_entries { t += reso; - index += 1; if index >= self.data.len() { index = 0; } + index += 1; + if index >= self.data.len() { + index = 0; + } if t < min_time { self.data[index] = f64::NAN; } else { @@ -233,12 +235,24 @@ impl RRA { self.last_count = 1; } else { let new_value = match self.cf { - CF::Maximum => if last_value > value { last_value } else { value }, - CF::Minimum => if last_value < value { last_value } else { value }, + CF::Maximum => { + if last_value > value { + last_value + } else { + value + } + } + CF::Minimum => { + if last_value < value { + last_value + } else { + value + } + } CF::Last => value, CF::Average => { - (last_value*(self.last_count as f64))/(new_count as f64) - + value/(new_count as f64) + (last_value * (self.last_count as f64)) / (new_count as f64) + + value / (new_count as f64) } }; self.data[index] = new_value; @@ -264,12 +278,14 @@ impl RRA { let mut list = Vec::new(); let rrd_end = self.slot_end_time(last_update); - let rrd_start = rrd_end.saturating_sub(reso*num_entries); + let rrd_start = rrd_end.saturating_sub(reso * num_entries); let mut t = start; let mut index = self.slot(t); for _ in 0..num_entries { - if t > end { break; }; + if t > end { + break; + }; if t < rrd_start || t >= rrd_end { list.push(None); } else { @@ -281,7 +297,10 @@ impl RRA { } } t += reso; - index += 1; if index >= self.data.len() { index = 0; } + index += 1; + if index >= self.data.len() { + index = 0; + } } (start, reso, list) @@ -298,17 +317,11 @@ pub struct RRD { } impl RRD { - /// Creates a new Instance pub fn new(dst: DST, rra_list: Vec) -> RRD { - let source = DataSource::new(dst); - RRD { - source, - rra_list, - } - + RRD { source, rra_list } } fn from_raw(raw: &[u8]) -> Result { @@ -340,7 +353,6 @@ impl RRD { /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in /// the linux page cache. pub fn load(path: &Path, avoid_page_cache: bool) -> Result { - let mut file = std::fs::File::open(path)?; let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0); let mut raw = Vec::with_capacity(buffer_size); @@ -352,12 +364,16 @@ impl RRD { 0, buffer_size as i64, nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, - ).map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?; + ) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?; } match Self::from_raw(&raw) { Ok(rrd) => Ok(rrd), - Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())), + Err(err) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + )), } } @@ -366,13 +382,12 @@ impl RRD { /// Setting `avoid_page_cache` uses /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in /// the linux page cache. - pub fn save( + pub fn save( &self, path: &Path, options: CreateOptions, avoid_page_cache: bool, ) -> Result<(), Error> { - let (fd, tmp_path) = make_tmp_file(&path, options)?; let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; @@ -419,7 +434,6 @@ impl RRD { /// /// Note: This does not call [Self::save]. pub fn update(&mut self, time: f64, value: f64) { - let value = match self.source.compute_new_value(time, value) { Ok(value) => value, Err(err) => { @@ -451,11 +465,14 @@ impl RRD { start: Option, end: Option, ) -> Result<(u64, u64, Vec>), Error> { - let mut rra: Option<&RRA> = None; for item in self.rra_list.iter() { - if item.cf != cf { continue; } - if item.resolution > resolution { continue; } + if item.cf != cf { + continue; + } + if item.resolution > resolution { + continue; + } if let Some(current) = rra { if item.resolution > current.resolution { @@ -469,16 +486,14 @@ impl RRD { match rra { Some(rra) => { let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64); - let start = start.unwrap_or_else(|| end.saturating_sub(10*rra.resolution)); + let start = start.unwrap_or_else(|| end.saturating_sub(10 * rra.resolution)); Ok(rra.extract_data(start, end, self.source.last_update)) } None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution), } } - } - #[cfg(test)] mod tests { use super::*; @@ -489,10 +504,10 @@ mod tests { let mut rrd = RRD::new(DST::Gauge, vec![rra]); for i in 2..10 { - rrd.update((i as f64)*30.0, i as f64); + rrd.update((i as f64) * 30.0, i as f64); } - let (start, reso, data) = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5*60))?; + let (start, reso, data) = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5 * 60))?; assert_eq!(start, 0); assert_eq!(reso, 60); assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]); @@ -506,10 +521,10 @@ mod tests { let mut rrd = RRD::new(DST::Gauge, vec![rra]); for i in 2..10 { - rrd.update((i as f64)*30.0, i as f64); + rrd.update((i as f64) * 30.0, i as f64); } - let (start, reso, data) = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5*60))?; + let (start, reso, data) = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5 * 60))?; assert_eq!(start, 0); assert_eq!(reso, 60); assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]); @@ -523,12 +538,16 @@ mod tests { let mut rrd = RRD::new(DST::Gauge, vec![rra]); for i in 2..10 { - rrd.update((i as f64)*30.0, i as f64); + rrd.update((i as f64) * 30.0, i as f64); } - assert!(rrd.extract_data(CF::Average, 60, Some(0), Some(5*60)).is_err(), "CF::Average should not exist"); + assert!( + rrd.extract_data(CF::Average, 60, Some(0), Some(5 * 60)) + .is_err(), + "CF::Average should not exist" + ); - let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20*60))?; + let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?; assert_eq!(start, 0); assert_eq!(reso, 60); assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]); @@ -542,10 +561,10 @@ mod tests { let mut rrd = RRD::new(DST::Derive, vec![rra]); for i in 2..10 { - rrd.update((i as f64)*30.0, (i*60) as f64); + rrd.update((i as f64) * 30.0, (i * 60) as f64); } - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?; + let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; assert_eq!(start, 60); assert_eq!(reso, 60); assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]); @@ -559,40 +578,42 @@ mod tests { let mut rrd = RRD::new(DST::Gauge, vec![rra]); for i in 2..10 { - rrd.update((i as f64)*30.0, i as f64); + rrd.update((i as f64) * 30.0, i as f64); } - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?; + let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; assert_eq!(start, 60); assert_eq!(reso, 60); assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]); for i in 10..14 { - rrd.update((i as f64)*30.0, i as f64); + rrd.update((i as f64) * 30.0, i as f64); } - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?; + let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?; assert_eq!(start, 60); assert_eq!(reso, 60); assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]); - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(3*60), Some(8*60))?; - assert_eq!(start, 3*60); + let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(3 * 60), Some(8 * 60))?; + assert_eq!(start, 3 * 60); assert_eq!(reso, 60); assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]); // add much newer vaule (should delete all previous/outdated value) - let i = 100; rrd.update((i as f64)*30.0, i as f64); + let i = 100; + rrd.update((i as f64) * 30.0, i as f64); println!("TEST {:?}", serde_json::to_string_pretty(&rrd)); - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(100*30 + 5*60))?; - assert_eq!(start, 100*30); + let (start, reso, data) = + rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(100 * 30 + 5 * 60))?; + assert_eq!(start, 100 * 30); assert_eq!(reso, 60); assert_eq!(data, [Some(100.0), None, None, None, None]); // extract with end time smaller than start time - let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(60))?; - assert_eq!(start, 100*30); + let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(60))?; + assert_eq!(start, 100 * 30); assert_eq!(reso, 60); assert_eq!(data, []); diff --git a/proxmox-rrd/src/rrd_v1.rs b/proxmox-rrd/src/rrd_v1.rs index 7e4b97c2..a1e7bf8e 100644 --- a/proxmox-rrd/src/rrd_v1.rs +++ b/proxmox-rrd/src/rrd_v1.rs @@ -8,11 +8,11 @@ pub const RRD_DATA_ENTRIES: usize = 70; /// Proxmox RRD file magic number // openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8]; -pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; +pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; -use crate::rrd::{RRD, RRA, CF, DST, DataSource}; +use crate::rrd::{DataSource, CF, DST, RRA, RRD}; -bitflags!{ +bitflags! { /// Flags to specify the data soure type and consolidation function pub struct RRAFlags: u64 { // Data Source Types @@ -49,19 +49,16 @@ pub struct RRAv1 { } impl RRAv1 { - - fn extract_data( - &self, - ) -> (u64, u64, Vec>) { + fn extract_data(&self) -> (u64, u64, Vec>) { let reso = self.resolution; let mut list = Vec::new(); - let rra_end = reso*((self.last_update as u64)/reso); - let rra_start = rra_end - reso*(RRD_DATA_ENTRIES as u64); + let rra_end = reso * ((self.last_update as u64) / reso); + let rra_start = rra_end - reso * (RRD_DATA_ENTRIES as u64); let mut t = rra_start; - let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + let mut index = ((t / reso) % (RRD_DATA_ENTRIES as u64)) as usize; for _ in 0..RRD_DATA_ENTRIES { let value = self.data[index]; if value.is_nan() { @@ -70,7 +67,8 @@ impl RRAv1 { list.push(Some(value)); } - t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + t += reso; + index = (index + 1) % RRD_DATA_ENTRIES; } (rra_start, reso, list) @@ -106,9 +104,7 @@ pub struct RRDv1 { } impl RRDv1 { - pub fn from_raw(mut raw: &[u8]) -> Result { - let expected_len = std::mem::size_of::(); if raw.len() != expected_len { @@ -118,7 +114,8 @@ impl RRDv1 { let mut rrd: RRDv1 = unsafe { std::mem::zeroed() }; unsafe { - let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); + let rrd_slice = + std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); raw.read_exact(rrd_slice)?; } @@ -131,7 +128,6 @@ impl RRDv1 { } pub fn to_rrd_v2(&self) -> Result { - let mut rra_list = Vec::new(); // old format v1: @@ -150,30 +146,36 @@ impl RRDv1 { // decade 1 week, 570 points // Linear extrapolation - fn extrapolate_data(start: u64, reso: u64, factor: u64, data: Vec>) -> (u64, u64, Vec>) { - + fn extrapolate_data( + start: u64, + reso: u64, + factor: u64, + data: Vec>, + ) -> (u64, u64, Vec>) { let mut new = Vec::new(); for i in 0..data.len() { let mut next = i + 1; - if next >= data.len() { next = 0 }; + if next >= data.len() { + next = 0 + }; let v = data[i]; let v1 = data[next]; match (v, v1) { (Some(v), Some(v1)) => { - let diff = (v1 - v)/(factor as f64); + let diff = (v1 - v) / (factor as f64); for j in 0..factor { - new.push(Some(v + diff*(j as f64))); + new.push(Some(v + diff * (j as f64))); } } (Some(v), None) => { new.push(Some(v)); - for _ in 0..factor-1 { + for _ in 0..factor - 1 { new.push(None); } } (None, Some(v1)) => { - for _ in 0..factor-1 { + for _ in 0..factor - 1 { new.push(None); } new.push(Some(v1)); @@ -186,7 +188,7 @@ impl RRDv1 { } } - (start, reso/factor, new) + (start, reso / factor, new) } // Try to convert to new, higher capacity format @@ -213,7 +215,7 @@ impl RRDv1 { // compute montly average (merge old self.month_avg, // self.week_avg and self.day_avg) - let mut month_avg = RRA::new(CF::Average, 30*60, 1440); + let mut month_avg = RRA::new(CF::Average, 30 * 60, 1440); let (start, reso, data) = self.month_avg.extract_data(); let (start, reso, data) = extrapolate_data(start, reso, 24, data); @@ -228,7 +230,7 @@ impl RRDv1 { // compute montly maximum (merge old self.month_max, // self.week_max and self.day_max) - let mut month_max = RRA::new(CF::Maximum, 30*60, 1440); + let mut month_max = RRA::new(CF::Maximum, 30 * 60, 1440); let (start, reso, data) = self.month_max.extract_data(); let (start, reso, data) = extrapolate_data(start, reso, 24, data); @@ -242,26 +244,26 @@ impl RRDv1 { month_max.insert_data(start, reso, data)?; // compute yearly average (merge old self.year_avg) - let mut year_avg = RRA::new(CF::Average, 6*3600, 1440); + let mut year_avg = RRA::new(CF::Average, 6 * 3600, 1440); let (start, reso, data) = self.year_avg.extract_data(); let (start, reso, data) = extrapolate_data(start, reso, 28, data); year_avg.insert_data(start, reso, data)?; // compute yearly maximum (merge old self.year_avg) - let mut year_max = RRA::new(CF::Maximum, 6*3600, 1440); + let mut year_max = RRA::new(CF::Maximum, 6 * 3600, 1440); let (start, reso, data) = self.year_max.extract_data(); let (start, reso, data) = extrapolate_data(start, reso, 28, data); year_max.insert_data(start, reso, data)?; // compute decade average (merge old self.year_avg) - let mut decade_avg = RRA::new(CF::Average, 7*86400, 570); + let mut decade_avg = RRA::new(CF::Average, 7 * 86400, 570); let (start, reso, data) = self.year_avg.extract_data(); decade_avg.insert_data(start, reso, data)?; // compute decade maximum (merge old self.year_max) - let mut decade_max = RRA::new(CF::Maximum, 7*86400, 570); + let mut decade_max = RRA::new(CF::Maximum, 7 * 86400, 570); let (start, reso, data) = self.year_max.extract_data(); decade_max.insert_data(start, reso, data)?; @@ -286,11 +288,8 @@ impl RRDv1 { let source = DataSource { dst, last_value: f64::NAN, - last_update: self.hour_avg.last_update, // IMPORTANT! + last_update: self.hour_avg.last_update, // IMPORTANT! }; - Ok(RRD { - source, - rra_list, - }) + Ok(RRD { source, rra_list }) } } diff --git a/proxmox-rrd/tests/file_format_test.rs b/proxmox-rrd/tests/file_format_test.rs index 81e49ca3..372a4077 100644 --- a/proxmox-rrd/tests/file_format_test.rs +++ b/proxmox-rrd/tests/file_format_test.rs @@ -7,7 +7,6 @@ use proxmox_rrd::rrd::RRD; use proxmox_sys::fs::CreateOptions; fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> { - let status = Command::new("/usr/bin/cmp") .arg(fn1) .arg(fn2) @@ -27,7 +26,6 @@ const RRD_V2_FN: &str = "./tests/testdata/cpu.rrd_v2"; // make sure we can load and convert RRD v1 #[test] fn upgrade_from_rrd_v1() -> Result<(), Error> { - let rrd = RRD::load(Path::new(RRD_V1_FN), true)?; const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded"; @@ -44,7 +42,6 @@ fn upgrade_from_rrd_v1() -> Result<(), Error> { // make sure we can load and save RRD v2 #[test] fn load_and_save_rrd_v2() -> Result<(), Error> { - let rrd = RRD::load(Path::new(RRD_V2_FN), true)?; const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved";