From 30b4800f4f6bc227087ab8371c15a66cc0953c84 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 15 Oct 2021 12:26:33 +0200 Subject: [PATCH] proxmox-rrd: implement non blocking journal Do not block while applying the journal. --- proxmox-rrd/Cargo.toml | 1 + proxmox-rrd/src/cache.rs | 434 ++++++++++++++++++++++++++++----------- 2 files changed, 320 insertions(+), 115 deletions(-) diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml index 31473962..900b8fef 100644 --- a/proxmox-rrd/Cargo.toml +++ b/proxmox-rrd/Cargo.toml @@ -11,6 +11,7 @@ proxmox-router = "1.1" [dependencies] anyhow = "1.0" bitflags = "1.2.1" +crossbeam-channel = "0.5" log = "0.4" nix = "0.19.1" serde = { version = "1.0", features = ["derive"] } diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index bf8486a0..7366281a 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -1,12 +1,13 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use std::io::Write; use std::io::{BufRead, BufReader}; -use std::os::unix::io::AsRawFd; use std::time::SystemTime; - +use std::ffi::OsStr; +use std::thread::spawn; +use crossbeam_channel::{bounded, Receiver, TryRecvError}; use anyhow::{format_err, bail, Error}; use nix::fcntl::OFlag; @@ -21,23 +22,37 @@ const RRD_JOURNAL_NAME: &str = "rrd.journal"; /// This cache is designed to run as single instance (no concurrent /// access from other processes). pub struct RRDCache { + config: Arc, + state: Arc>, + rrd_map: Arc>, +} + +struct CacheConfig { apply_interval: f64, basedir: PathBuf, file_options: CreateOptions, - state: RwLock, - rrd_map: RwLock, + dir_options: CreateOptions, } struct RRDMap { - basedir: PathBuf, - file_options: CreateOptions, - dir_options: CreateOptions, + config: Arc, map: HashMap, load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, } impl RRDMap { + fn new( + config: Arc, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, + ) -> Self { + Self { + config, + map: HashMap::new(), + load_rrd_cb, + } + } + fn update( &mut self, rel_path: &str, @@ -51,9 +66,13 @@ impl RRDMap { rrd.update(time, value); } } else { - let mut path = self.basedir.clone(); + let mut path = self.config.basedir.clone(); path.push(rel_path); - create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; + create_path( + path.parent().unwrap(), + Some(self.config.dir_options.clone()), + Some(self.config.dir_options.clone()), + )?; let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst); @@ -72,10 +91,10 @@ impl RRDMap { for (rel_path, rrd) in self.map.iter() { rrd_file_count += 1; - let mut path = self.basedir.clone(); + let mut path = self.config.basedir.clone(); path.push(&rel_path); - if let Err(err) = rrd.save(&path, self.file_options.clone()) { + if let Err(err) = rrd.save(&path, self.config.file_options.clone()) { errors += 1; log::error!("unable to save {:?}: {}", path, err); } @@ -106,9 +125,11 @@ impl RRDMap { // shared state behind RwLock struct JournalState { + config: Arc, journal: File, last_journal_flush: f64, journal_applied: bool, + apply_thread_result: Option>>, } struct JournalEntry { @@ -118,6 +139,98 @@ struct JournalEntry { rel_path: String, } +impl JournalState { + + fn new(config: Arc) -> Result { + let journal = JournalState::open_journal_writer(&config)?; + Ok(Self { + config, + journal, + last_journal_flush: 0.0, + journal_applied: false, + apply_thread_result: None, + }) + } + + fn open_journal_reader(&self) -> Result, Error> { + + // fixme : dup self.journal instead?? + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + self.config.file_options.clone(), + )?; + Ok(BufReader::new(journal)) + } + + fn open_journal_writer(config: &CacheConfig) -> Result { + let mut journal_path = config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; + let journal = atomic_open_or_create_file( + &journal_path, + flags, + &[], + config.file_options.clone(), + )?; + Ok(journal) + } + + fn rotate_journal(&mut self) -> Result<(), Error> { + let mut journal_path = self.config.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let mut new_name = journal_path.clone(); + let now = proxmox_time::epoch_i64(); + new_name.set_extension(format!("journal-{:08x}", now)); + std::fs::rename(journal_path, new_name)?; + + self.journal = Self::open_journal_writer(&self.config)?; + Ok(()) + } + + fn remove_old_journals(&self) -> Result<(), Error> { + + let journal_list = self.list_old_journals()?; + + for (_time, _filename, path) in journal_list { + std::fs::remove_file(path)?; + } + + Ok(()) + } + + fn list_old_journals(&self) -> Result, Error> { + let mut list = Vec::new(); + for entry in std::fs::read_dir(&self.config.basedir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + if let Some(stem) = path.file_stem() { + if stem != OsStr::new("rrd") { continue; } + if let Some(extension) = path.extension() { + if let Some(extension) = extension.to_str() { + if let Some(rest) = extension.strip_prefix("journal-") { + if let Ok(time) = u64::from_str_radix(rest, 16) { + list.push((time, format!("rrd.{}", extension), path.to_owned())); + } + } + } + } + } + } + } + list.sort_unstable_by_key(|t| t.0); + Ok(list) + } +} + impl RRDCache { /// Creates a new instance @@ -149,33 +262,21 @@ impl RRDCache { create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; - let mut journal_path = basedir.clone(); - journal_path.push(RRD_JOURNAL_NAME); - - let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; - let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?; - - let state = JournalState { - journal, - last_journal_flush: 0.0, - journal_applied: false, - }; - - let rrd_map = RRDMap { + let config = Arc::new(CacheConfig { basedir: basedir.clone(), file_options: file_options.clone(), dir_options: dir_options, - map: HashMap::new(), - load_rrd_cb, - }; + apply_interval, + }); + + let state = JournalState::new(Arc::clone(&config))?; + let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb); Ok(Self { - basedir, - file_options, - apply_interval, - state: RwLock::new(state), - rrd_map: RwLock::new(rrd_map), - }) + config: Arc::clone(&config), + state: Arc::new(RwLock::new(state)), + rrd_map: Arc::new(RwLock::new(rrd_map)), + }) } /// Create a new RRD as used by the proxmox backup server @@ -243,102 +344,64 @@ impl RRDCache { } fn append_journal_entry( - state: &mut JournalState, + &self, time: f64, value: f64, dst: DST, rel_path: &str, ) -> Result<(), Error> { + let mut state = self.state.write().unwrap(); // block other writers let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); state.journal.write_all(journal_entry.as_bytes())?; Ok(()) } /// Apply and commit the journal. Should be used at server startup. - pub fn apply_journal(&self) -> Result<(), Error> { - let mut state = self.state.write().unwrap(); // block writers - self.apply_and_commit_journal_locked(&mut state) - } + pub fn apply_journal(&self) -> Result { + let state = Arc::clone(&self.state); + let rrd_map = Arc::clone(&self.rrd_map); - fn apply_and_commit_journal_locked(&self, state: &mut JournalState) -> Result<(), Error> { + let mut state_guard = self.state.write().unwrap(); + let journal_applied = state_guard.journal_applied; + let now = proxmox_time::epoch_f64(); + let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; - state.last_journal_flush = proxmox_time::epoch_f64(); + if journal_applied && !wants_commit { return Ok(journal_applied); } - if !state.journal_applied { - let start_time = SystemTime::now(); - log::debug!("applying rrd journal"); - - match self.apply_journal_locked(state) { - Ok(entries) => { - let elapsed = start_time.elapsed()?.as_secs_f64(); - log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); + if let Some(ref recv) = state_guard.apply_thread_result { + match recv.try_recv() { + Ok(Ok(())) => { + // finished without errors, OK + } + Ok(Err(err)) => { + // finished with errors, log them + log::error!("{}", err); + } + Err(TryRecvError::Empty) => { + // still running + return Ok(journal_applied); + } + Err(TryRecvError::Disconnected) => { + // crashed, start again + log::error!("apply journal thread crashed - try again"); } - Err(err) => bail!("apply rrd journal failed - {}", err), } } - let start_time = SystemTime::now(); - log::debug!("commit rrd journal"); + state_guard.last_journal_flush = proxmox_time::epoch_f64(); - match self.commit_journal_locked(state) { - Ok(rrd_file_count) => { - let elapsed = start_time.elapsed()?.as_secs_f64(); - log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", - rrd_file_count, elapsed); - } - Err(err) => bail!("rrd journal commit failed: {}", err), - } + let (sender, receiver) = bounded(1); + state_guard.apply_thread_result = Some(receiver); - Ok(()) + spawn(move || { + let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied) + .map_err(|err| err.to_string()); + sender.send(result).unwrap(); + }); + + Ok(journal_applied) } - fn apply_journal_locked(&self, state: &mut JournalState) -> Result { - - let mut journal_path = self.basedir.clone(); - journal_path.push(RRD_JOURNAL_NAME); - - let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; - let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?; - let mut journal = BufReader::new(journal); - - // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ?? - let mut linenr = 0; - loop { - linenr += 1; - let mut line = String::new(); - let len = journal.read_line(&mut line)?; - if len == 0 { break; } - - let entry = match Self::parse_journal_line(&line) { - Ok(entry) => entry, - Err(err) => { - log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err); - continue; // skip unparsable lines - } - }; - - self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; - } - - // We need to apply the journal only once, because further updates - // are always directly applied. - state.journal_applied = true; - - Ok(linenr) - } - - fn commit_journal_locked(&self, state: &mut JournalState) -> Result { - - // save all RRDs - we only need a read lock here - let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?; - - // if everything went ok, commit the journal - - nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) - .map_err(|err| format_err!("unable to truncate journal - {}", err))?; - - Ok(rrd_file_count) - } /// Update data in RAM and write file back to disk (journal) pub fn update_value( @@ -349,16 +412,14 @@ impl RRDCache { dst: DST, ) -> Result<(), Error> { - let mut state = self.state.write().unwrap(); // block other writers + let journal_applied = self.apply_journal()?; - if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval { - self.apply_and_commit_journal_locked(&mut state)?; + self.append_journal_entry(time, value, dst, rel_path)?; + + if journal_applied { + self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; } - Self::append_journal_entry(&mut state, time, value, dst, rel_path)?; - - self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; - Ok(()) } @@ -380,3 +441,146 @@ impl RRDCache { .extract_cached_data(base, name, cf, resolution, start, end) } } + + +fn apply_and_commit_journal_thread( + state: Arc>, + rrd_map: Arc>, + commit_only: bool, +) -> Result<(), Error> { + + if commit_only { + state.write().unwrap().rotate_journal()?; // start new journal, keep old one + } else { + let start_time = SystemTime::now(); + log::debug!("applying rrd journal"); + + match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { + Ok(entries) => { + let elapsed = start_time.elapsed().unwrap().as_secs_f64(); + log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); + } + Err(err) => bail!("apply rrd journal failed - {}", err), + } + } + + let start_time = SystemTime::now(); + log::debug!("commit rrd journal"); + + match commit_journal_impl(state, rrd_map) { + Ok(rrd_file_count) => { + let elapsed = start_time.elapsed().unwrap().as_secs_f64(); + log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", + rrd_file_count, elapsed); + } + Err(err) => bail!("rrd journal commit failed: {}", err), + } + Ok(()) +} + +fn apply_journal_lines( + state: Arc>, + rrd_map: Arc>, + journal_name: &str, // used for logging + reader: &mut BufReader, + lock_read_line: bool, +) -> Result { + + let mut linenr = 0; + + loop { + linenr += 1; + let mut line = String::new(); + let len = if lock_read_line { + let _lock = state.read().unwrap(); // make sure we read entire lines + reader.read_line(&mut line)? + } else { + reader.read_line(&mut line)? + }; + + if len == 0 { break; } + + let entry = match RRDCache::parse_journal_line(&line) { + Ok(entry) => entry, + Err(err) => { + log::warn!( + "unable to parse rrd journal '{}' line {} (skip) - {}", + journal_name, linenr, err, + ); + continue; // skip unparsable lines + } + }; + + rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; + } + Ok(linenr) +} + +fn apply_journal_impl( + state: Arc>, + rrd_map: Arc>, +) -> Result { + + let mut lines = 0; + + // Apply old journals first + let journal_list = state.read().unwrap().list_old_journals()?; + + for (_time, filename, path) in journal_list { + log::info!("apply old journal log {}", filename); + let file = std::fs::OpenOptions::new().read(true).open(path)?; + let mut reader = BufReader::new(file); + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + &filename, + &mut reader, + false, + )?; + } + + let mut journal = state.read().unwrap().open_journal_reader()?; + + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + "rrd.journal", + &mut journal, + true, + )?; + + { + let mut state_guard = state.write().unwrap(); // block other writers + + lines += apply_journal_lines( + Arc::clone(&state), + Arc::clone(&rrd_map), + "rrd.journal", + &mut journal, + false, + )?; + + state_guard.rotate_journal()?; // start new journal, keep old one + + // We need to apply the journal only once, because further updates + // are always directly applied. + state_guard.journal_applied = true; + } + + + Ok(lines) +} + +fn commit_journal_impl( + state: Arc>, + rrd_map: Arc>, +) -> Result { + + // save all RRDs - we only need a read lock here + let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?; + + // if everything went ok, remove the old journal files + state.write().unwrap().remove_old_journals()?; + + Ok(rrd_file_count) +}