diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index 4c7f05f0..54fbe378 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -24,14 +24,88 @@ pub struct RRDCache { apply_interval: f64, basedir: PathBuf, file_options: CreateOptions, - dir_options: CreateOptions, state: RwLock, - load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD, + rrd_map: RwLock, +} + +struct RRDMap { + basedir: PathBuf, + file_options: CreateOptions, + dir_options: CreateOptions, + map: HashMap, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, +} + +impl RRDMap { + + fn update( + &mut self, + rel_path: &str, + time: f64, + value: f64, + dst: DST, + new_only: bool, + ) -> Result<(), Error> { + if let Some(rrd) = self.map.get_mut(rel_path) { + if !new_only || time > rrd.last_update() { + rrd.update(time, value); + } + } else { + let mut path = self.basedir.clone(); + path.push(rel_path); + create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; + + let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst); + + if !new_only || time > rrd.last_update() { + rrd.update(time, value); + } + self.map.insert(rel_path.to_string(), rrd); + } + Ok(()) + } + + fn flush_rrd_files(&self) -> Result { + let mut rrd_file_count = 0; + + let mut errors = 0; + for (rel_path, rrd) in self.map.iter() { + rrd_file_count += 1; + + let mut path = self.basedir.clone(); + path.push(&rel_path); + + if let Err(err) = rrd.save(&path, self.file_options.clone()) { + errors += 1; + log::error!("unable to save {:?}: {}", path, err); + } + } + + if errors != 0 { + bail!("errors during rrd flush - unable to commit rrd journal"); + } + + Ok(rrd_file_count) + } + + fn extract_cached_data( + &self, + base: &str, + name: &str, + cf: CF, + resolution: u64, + start: Option, + end: Option, + ) -> Result>)>, Error> { + match self.map.get(&format!("{}/{}", base, name)) { + Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)), + None => Ok(None), + } + } } // shared state behind RwLock struct RRDCacheState { - rrd_map: HashMap, journal: File, last_journal_flush: f64, journal_applied: bool, @@ -65,7 +139,7 @@ impl RRDCache { file_options: Option, dir_options: Option, apply_interval: f64, - load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD, + load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, ) -> Result { let basedir = basedir.as_ref().to_owned(); @@ -83,19 +157,25 @@ impl RRDCache { let state = RRDCacheState { journal, - rrd_map: HashMap::new(), last_journal_flush: 0.0, journal_applied: false, }; + let rrd_map = RRDMap { + basedir: basedir.clone(), + file_options: file_options.clone(), + dir_options: dir_options, + map: HashMap::new(), + load_rrd_cb, + }; + Ok(Self { basedir, file_options, - dir_options, apply_interval, - load_rrd_cb, state: RwLock::new(state), - }) + rrd_map: RwLock::new(rrd_map), + }) } /// Create a new RRD as used by the proxmox backup server @@ -221,17 +301,7 @@ impl RRDCache { let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?; let mut journal = BufReader::new(journal); - let mut last_update_map = HashMap::new(); - - let mut get_last_update = |rel_path: &str, rrd: &RRD| { - if let Some(time) = last_update_map.get(rel_path) { - return *time; - } - let last_update = rrd.last_update(); - last_update_map.insert(rel_path.to_string(), last_update); - last_update - }; - + // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ?? let mut linenr = 0; loop { linenr += 1; @@ -247,25 +317,9 @@ impl RRDCache { } }; - if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) { - if entry.time > get_last_update(&entry.rel_path, &rrd) { - rrd.update(entry.time, entry.value); - } - } else { - let mut path = self.basedir.clone(); - path.push(&entry.rel_path); - create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; - - let mut rrd = (self.load_rrd_cb)(&self, &path, &entry.rel_path, entry.dst); - - if entry.time > get_last_update(&entry.rel_path, &rrd) { - rrd.update(entry.time, entry.value); - } - state.rrd_map.insert(entry.rel_path.clone(), rrd); - } + self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; } - // We need to apply the journal only once, because further updates // are always directly applied. state.journal_applied = true; @@ -275,23 +329,8 @@ impl RRDCache { fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result { - // save all RRDs - let mut rrd_file_count = 0; - - let mut errors = 0; - for (rel_path, rrd) in state.rrd_map.iter() { - rrd_file_count += 1; - let mut path = self.basedir.clone(); - path.push(&rel_path); - if let Err(err) = rrd.save(&path, self.file_options.clone()) { - errors += 1; - log::error!("unable to save {:?}: {}", path, err); - } - } - - if errors != 0 { - bail!("errors during rrd flush - unable to commit rrd journal"); - } + // save all RRDs - we only need a read lock here + let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?; // if everything went ok, commit the journal @@ -318,18 +357,7 @@ impl RRDCache { Self::append_journal_entry(&mut state, time, value, dst, rel_path)?; - if let Some(rrd) = state.rrd_map.get_mut(rel_path) { - rrd.update(time, value); - } else { - let mut path = self.basedir.clone(); - path.push(rel_path); - create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; - - let mut rrd = (self.load_rrd_cb)(&self, &path, rel_path, dst); - - rrd.update(time, value); - state.rrd_map.insert(rel_path.into(), rrd); - } + self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; Ok(()) } @@ -348,12 +376,7 @@ impl RRDCache { start: Option, end: Option, ) -> Result>)>, Error> { - - let state = self.state.read().unwrap(); - - match state.rrd_map.get(&format!("{}/{}", base, name)) { - Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)), - None => Ok(None), - } + self.rrd_map.read().unwrap() + .extract_cached_data(base, name, cf, resolution, start, end) } }