From 9dcc64b71a207f0c6e58a9c98e1398812b83a882 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 16 Oct 2021 12:00:25 +0200 Subject: [PATCH] proxmox-rrd: move JournalState into extra file --- proxmox-rrd/src/cache.rs | 135 ++---------------------------- proxmox-rrd/src/cache/journal.rs | 137 +++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 128 deletions(-) create mode 100644 proxmox-rrd/src/cache/journal.rs diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index 7366281a..0b7aa0de 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -2,20 +2,18 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use std::io::Write; use std::io::{BufRead, BufReader}; use std::time::SystemTime; -use std::ffi::OsStr; use std::thread::spawn; -use crossbeam_channel::{bounded, Receiver, TryRecvError}; +use crossbeam_channel::{bounded, TryRecvError}; use anyhow::{format_err, bail, Error}; -use nix::fcntl::OFlag; -use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions}; +use proxmox::tools::fs::{create_path, CreateOptions}; use crate::rrd::{DST, CF, RRD, RRA}; -const RRD_JOURNAL_NAME: &str = "rrd.journal"; +mod journal; +use journal::*; /// RRD cache - keep RRD data in RAM, but write updates to disk /// @@ -27,7 +25,7 @@ pub struct RRDCache { rrd_map: Arc>, } -struct CacheConfig { +pub(crate) struct CacheConfig { apply_interval: f64, basedir: PathBuf, file_options: CreateOptions, @@ -123,113 +121,6 @@ impl RRDMap { } } -// shared state behind RwLock -struct JournalState { - config: Arc, - journal: File, - last_journal_flush: f64, - journal_applied: bool, - apply_thread_result: Option>>, -} - -struct JournalEntry { - time: f64, - value: f64, - dst: DST, - rel_path: String, -} - -impl JournalState { - - fn new(config: Arc) -> Result { - let journal = JournalState::open_journal_writer(&config)?; - Ok(Self { - config, - journal, - last_journal_flush: 0.0, - journal_applied: false, - apply_thread_result: None, - }) - } - - fn open_journal_reader(&self) -> Result, Error> { - - // fixme : dup self.journal instead?? - let mut journal_path = self.config.basedir.clone(); - journal_path.push(RRD_JOURNAL_NAME); - - let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; - let journal = atomic_open_or_create_file( - &journal_path, - flags, - &[], - self.config.file_options.clone(), - )?; - Ok(BufReader::new(journal)) - } - - fn open_journal_writer(config: &CacheConfig) -> Result { - let mut journal_path = config.basedir.clone(); - journal_path.push(RRD_JOURNAL_NAME); - - let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; - let journal = atomic_open_or_create_file( - &journal_path, - flags, - &[], - config.file_options.clone(), - )?; - Ok(journal) - } - - fn rotate_journal(&mut self) -> Result<(), Error> { - let mut journal_path = self.config.basedir.clone(); - journal_path.push(RRD_JOURNAL_NAME); - - let mut new_name = journal_path.clone(); - let now = proxmox_time::epoch_i64(); - new_name.set_extension(format!("journal-{:08x}", now)); - std::fs::rename(journal_path, new_name)?; - - self.journal = Self::open_journal_writer(&self.config)?; - Ok(()) - } - - fn remove_old_journals(&self) -> Result<(), Error> { - - let journal_list = self.list_old_journals()?; - - for (_time, _filename, path) in journal_list { - std::fs::remove_file(path)?; - } - - Ok(()) - } - - fn list_old_journals(&self) -> Result, Error> { - let mut list = Vec::new(); - for entry in std::fs::read_dir(&self.config.basedir)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() { - if let Some(stem) = path.file_stem() { - if stem != OsStr::new("rrd") { continue; } - if let Some(extension) = path.extension() { - if let Some(extension) = extension.to_str() { - if let Some(rest) = extension.strip_prefix("journal-") { - if let Ok(time) = u64::from_str_radix(rest, 16) { - list.push((time, format!("rrd.{}", extension), path.to_owned())); - } - } - } - } - } - } - } - list.sort_unstable_by_key(|t| t.0); - Ok(list) - } -} impl RRDCache { @@ -343,19 +234,6 @@ impl RRDCache { Ok(JournalEntry { time, value, dst, rel_path }) } - fn append_journal_entry( - &self, - time: f64, - value: f64, - dst: DST, - rel_path: &str, - ) -> Result<(), Error> { - let mut state = self.state.write().unwrap(); // block other writers - let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); - state.journal.write_all(journal_entry.as_bytes())?; - Ok(()) - } - /// Apply and commit the journal. Should be used at server startup. pub fn apply_journal(&self) -> Result { let state = Arc::clone(&self.state); @@ -414,7 +292,8 @@ impl RRDCache { let journal_applied = self.apply_journal()?; - self.append_journal_entry(time, value, dst, rel_path)?; + self.state.write().unwrap() + .append_journal_entry(time, value, dst, rel_path)?; if journal_applied { self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; diff --git a/proxmox-rrd/src/cache/journal.rs b/proxmox-rrd/src/cache/journal.rs new file mode 100644 index 00000000..e0f7a88b --- /dev/null +++ b/proxmox-rrd/src/cache/journal.rs @@ -0,0 +1,137 @@ +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::io::{Write, BufReader}; +use std::ffi::OsStr; + +use anyhow::Error; +use nix::fcntl::OFlag; +use crossbeam_channel::Receiver; + +use proxmox::tools::fs::atomic_open_or_create_file; + +const RRD_JOURNAL_NAME: &str = "rrd.journal"; + +use crate::rrd::DST; +use crate::cache::CacheConfig; + +// shared state behind RwLock +pub struct JournalState { + config: Arc, + journal: File, + pub last_journal_flush: f64, + pub journal_applied: bool, + pub apply_thread_result: Option>>, +} + +pub struct JournalEntry { + pub time: f64, + pub value: f64, + pub dst: DST, + pub rel_path: String, +} + +impl JournalState { + + pub(crate) fn new(config: Arc) -> Result { + let journal = JournalState::open_journal_writer(&config)?; + Ok(Self { + config, + journal, + last_journal_flush: 0.0, + journal_applied: false, + apply_thread_result: None, + }) + } + + pub fn append_journal_entry( + &mut self, + time: f64, + value: f64, + dst: DST, + rel_path: &str, + ) -> Result<(), Error> { + let journal_entry = format!( + "{}:{}:{}:{}\n", time, value, dst as u8, rel_path); + self.journal.write_all(journal_entry.as_bytes())?; + Ok(()) + } + + pub fn open_journal_reader(&self) -> Result, Error> { + + // fixme : dup self.journal instead?? + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + self.config.file_options.clone(), + )?; + Ok(BufReader::new(journal)) + } + + fn open_journal_writer(config: &CacheConfig) -> Result { + let mut journal_path = config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + config.file_options.clone(), + )?; + Ok(journal) + } + + pub fn rotate_journal(&mut self) -> Result<(), Error> { + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let mut new_name = journal_path.clone(); + let now = proxmox_time::epoch_i64(); + new_name.set_extension(format!("journal-{:08x}", now)); + std::fs::rename(journal_path, new_name)?; + + self.journal = Self::open_journal_writer(&self.config)?; + Ok(()) + } + + pub fn remove_old_journals(&self) -> Result<(), Error> { + + let journal_list = self.list_old_journals()?; + + for (_time, _filename, path) in journal_list { + std::fs::remove_file(path)?; + } + + Ok(()) + } + + pub fn list_old_journals(&self) -> Result, Error> { + let mut list = Vec::new(); + for entry in std::fs::read_dir(&self.config.basedir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + if let Some(stem) = path.file_stem() { + if stem != OsStr::new("rrd") { continue; } + if let Some(extension) = path.extension() { + if let Some(extension) = extension.to_str() { + if let Some(rest) = extension.strip_prefix("journal-") { + if let Ok(time) = u64::from_str_radix(rest, 16) { + list.push((time, format!("rrd.{}", extension), path.to_owned())); + } + } + } + } + } + } + } + list.sort_unstable_by_key(|t| t.0); + Ok(list) + } +}