From 3275f1ac1615121e50f28e11203577e2b1f74ab1 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 14 Oct 2021 16:41:08 +0200 Subject: [PATCH] proxmox-rrd: log journal apply/flush times, split apply and flush We need to apply the journal only once. --- proxmox-rrd/src/cache.rs | 76 ++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 18 deletions(-) diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index 6cac5849..4c7f05f0 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -5,6 +5,7 @@ use std::sync::RwLock; use std::io::Write; use std::io::{BufRead, BufReader}; use std::os::unix::io::AsRawFd; +use std::time::SystemTime; use anyhow::{format_err, bail, Error}; use nix::fcntl::OFlag; @@ -33,6 +34,7 @@ struct RRDCacheState { rrd_map: HashMap, journal: File, last_journal_flush: f64, + journal_applied: bool, } struct JournalEntry { @@ -83,6 +85,7 @@ impl RRDCache { journal, rrd_map: HashMap::new(), last_journal_flush: 0.0, + journal_applied: false, }; Ok(Self { @@ -171,18 +174,46 @@ impl RRDCache { Ok(()) } - /// Apply journal. Should be used at server startup. + /// Apply and commit the journal. Should be used at server startup. pub fn apply_journal(&self) -> Result<(), Error> { let mut state = self.state.write().unwrap(); // block writers - self.apply_journal_locked(&mut state) + self.apply_and_commit_journal_locked(&mut state) } - fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> { - - log::info!("applying rrd journal"); + fn apply_and_commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> { state.last_journal_flush = proxmox_time::epoch_f64(); + if !state.journal_applied { + let start_time = SystemTime::now(); + log::debug!("applying rrd journal"); + + match self.apply_journal_locked(state) { + Ok(entries) => { + let elapsed = start_time.elapsed()?.as_secs_f64(); + log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); + } + Err(err) => bail!("apply rrd journal failed - {}", err), + } + } + + let start_time = SystemTime::now(); + log::debug!("commit rrd journal"); + + match self.commit_journal_locked(state) { + Ok(rrd_file_count) => { + let elapsed = start_time.elapsed()?.as_secs_f64(); + log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", + rrd_file_count, elapsed); + } + Err(err) => bail!("rrd journal commit failed: {}", err), + } + + Ok(()) + } + + fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result { + let mut journal_path = self.basedir.clone(); journal_path.push(RRD_JOURNAL_NAME); @@ -234,10 +265,22 @@ impl RRDCache { } } + + // We need to apply the journal only once, because further updates + // are always directly applied. + state.journal_applied = true; + + Ok(linenr) + } + + fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result { + // save all RRDs + let mut rrd_file_count = 0; let mut errors = 0; for (rel_path, rrd) in state.rrd_map.iter() { + rrd_file_count += 1; let mut path = self.basedir.clone(); path.push(&rel_path); if let Err(err) = rrd.save(&path, self.file_options.clone()) { @@ -246,17 +289,16 @@ impl RRDCache { } } - // if everything went ok, commit the journal - - if errors == 0 { - nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) - .map_err(|err| format_err!("unable to truncate journal - {}", err))?; - log::info!("rrd journal successfully committed"); - } else { - log::error!("errors during rrd flush - unable to commit rrd journal"); + if errors != 0 { + bail!("errors during rrd flush - unable to commit rrd journal"); } - Ok(()) + // if everything went ok, commit the journal + + nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) + .map_err(|err| format_err!("unable to truncate journal - {}", err))?; + + Ok(rrd_file_count) } /// Update data in RAM and write file back to disk (journal) @@ -270,10 +312,8 @@ impl RRDCache { let mut state = self.state.write().unwrap(); // block other writers - if (time - state.last_journal_flush) > self.apply_interval { - if let Err(err) = self.apply_journal_locked(&mut state) { - log::error!("apply journal failed: {}", err); - } + if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval { + self.apply_and_commit_journal_locked(&mut state)?; } Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;