From 6b809ff59b7cb4f74e2719a3175fdae3f52a0fc2 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 1 Sep 2020 11:17:13 +0200 Subject: [PATCH] src/backup/verify.rs: use separate thread to load data --- src/api2/admin/datastore.rs | 11 +- src/backup/verify.rs | 199 ++++++++++++++++++++++++------------ 2 files changed, 138 insertions(+), 72 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 156ebf83..f0f59ab7 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -1,6 +1,7 @@ use std::collections::{HashSet, HashMap}; use std::ffi::OsStr; use std::os::unix::ffi::OsStrExt; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::*; @@ -513,17 +514,17 @@ pub fn verify( to_stdout, move |worker| { let failed_dirs = if let Some(backup_dir) = backup_dir { - let mut verified_chunks = HashSet::with_capacity(1024*16); - let mut corrupt_chunks = HashSet::with_capacity(64); + let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); + let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); let mut res = Vec::new(); - if !verify_backup_dir(&datastore, &backup_dir, &mut verified_chunks, &mut corrupt_chunks, &worker)? { + if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? { res.push(backup_dir.to_string()); } res } else if let Some(backup_group) = backup_group { - verify_backup_group(&datastore, &backup_group, &worker)? + verify_backup_group(datastore, &backup_group, worker.clone())? } else { - verify_all_backups(&datastore, &worker)? + verify_all_backups(datastore, worker.clone())? }; if failed_dirs.len() > 0 { worker.log("Failed to verify following snapshots:"); diff --git a/src/backup/verify.rs b/src/backup/verify.rs index dc8be22c..fd103a44 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -1,4 +1,7 @@ use std::collections::HashSet; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{Ordering, AtomicUsize}; +use std::time::Instant; use anyhow::{bail, format_err, Error}; @@ -6,12 +9,12 @@ use crate::server::WorkerTask; use crate::api2::types::*; use super::{ - DataStore, BackupGroup, BackupDir, BackupInfo, IndexFile, + DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile, CryptMode, FileInfo, ArchiveType, archive_type, }; -fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { +fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { let blob = datastore.load_blob(backup_dir, &info.filename)?; @@ -36,48 +39,96 @@ fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) - } } +// We use a separate thread to read/load chunks, so that we can do +// load and verify in parallel to increase performance. +fn chunk_reader_thread( + datastore: Arc, + index: Box, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, + errors: Arc, + worker: Arc, +) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> { + + let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks + + std::thread::spawn(move|| { + for pos in 0..index.index_count() { + let info = index.chunk_info(pos).unwrap(); + let size = info.range.end - info.range.start; + + if verified_chunks.lock().unwrap().contains(&info.digest) { + continue; // already verified + } + + if corrupt_chunks.lock().unwrap().contains(&info.digest) { + let digest_str = proxmox::tools::digest_to_hex(&info.digest); + worker.log(format!("chunk {} was marked as corrupt", digest_str)); + errors.fetch_add(1, Ordering::SeqCst); + continue; + } + + match datastore.load_chunk(&info.digest) { + Err(err) => { + corrupt_chunks.lock().unwrap().insert(info.digest); + worker.log(format!("can't verify chunk, load failed - {}", err)); + errors.fetch_add(1, Ordering::SeqCst); + continue; + } + Ok(chunk) => { + if sender.send((chunk, info.digest, size)).is_err() { + break; // receiver gone - simply stop + } + } + } + } + }); + + receiver +} + fn verify_index_chunks( - datastore: &DataStore, - index: Box, - verified_chunks: &mut HashSet<[u8;32]>, - corrupt_chunks: &mut HashSet<[u8; 32]>, + datastore: Arc, + index: Box, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, crypt_mode: CryptMode, - worker: &WorkerTask, + worker: Arc, ) -> Result<(), Error> { - let mut errors = 0; - for pos in 0..index.index_count() { + let errors = Arc::new(AtomicUsize::new(0)); + + let start_time = Instant::now(); + + let chunk_channel = chunk_reader_thread( + datastore, + index, + verified_chunks.clone(), + corrupt_chunks.clone(), + errors.clone(), + worker.clone(), + ); + + let mut read_bytes = 0; + let mut decoded_bytes = 0; + + loop { worker.fail_on_abort()?; - let info = index.chunk_info(pos).unwrap(); - - if verified_chunks.contains(&info.digest) { - continue; // already verified - } - - if corrupt_chunks.contains(&info.digest) { - let digest_str = proxmox::tools::digest_to_hex(&info.digest); - worker.log(format!("chunk {} was marked as corrupt", digest_str)); - errors += 1; - continue; - } - - let chunk = match datastore.load_chunk(&info.digest) { - Err(err) => { - corrupt_chunks.insert(info.digest); - worker.log(format!("can't verify chunk, load failed - {}", err)); - errors += 1; - continue; - }, - Ok(chunk) => chunk, + let (chunk, digest, size) = match chunk_channel.recv() { + Ok(tuple) => tuple, + Err(std::sync::mpsc::RecvError) => break, }; + read_bytes += chunk.raw_size(); + decoded_bytes += size; + let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { - corrupt_chunks.insert(info.digest); + corrupt_chunks.lock().unwrap().insert(digest); worker.log(format!("can't verify chunk, unknown CryptMode - {}", err)); - errors += 1; + errors.fetch_add(1, Ordering::SeqCst); continue; }, Ok(mode) => mode, @@ -89,21 +140,32 @@ fn verify_index_chunks( chunk_crypt_mode, crypt_mode )); - errors += 1; + errors.fetch_add(1, Ordering::SeqCst); } - let size = info.range.end - info.range.start; - - if let Err(err) = chunk.verify_unencrypted(size as usize, &info.digest) { - corrupt_chunks.insert(info.digest); + if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { + corrupt_chunks.lock().unwrap().insert(digest); worker.log(format!("{}", err)); - errors += 1; + errors.fetch_add(1, Ordering::SeqCst); } else { - verified_chunks.insert(info.digest); + verified_chunks.lock().unwrap().insert(digest); } } - if errors > 0 { + let elapsed = start_time.elapsed().as_secs_f64(); + + let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0); + let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0); + + let read_speed = read_bytes_mib/elapsed; + let decode_speed = decoded_bytes_mib/elapsed; + + let error_count = errors.load(Ordering::SeqCst); + + worker.log(format!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)", + read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); + + if errors.load(Ordering::SeqCst) > 0 { bail!("chunks could not be verified"); } @@ -111,12 +173,12 @@ fn verify_index_chunks( } fn verify_fixed_index( - datastore: &DataStore, + datastore: Arc, backup_dir: &BackupDir, info: &FileInfo, - verified_chunks: &mut HashSet<[u8;32]>, - corrupt_chunks: &mut HashSet<[u8;32]>, - worker: &WorkerTask, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -137,12 +199,12 @@ fn verify_fixed_index( } fn verify_dynamic_index( - datastore: &DataStore, + datastore: Arc, backup_dir: &BackupDir, info: &FileInfo, - verified_chunks: &mut HashSet<[u8;32]>, - corrupt_chunks: &mut HashSet<[u8;32]>, - worker: &WorkerTask, + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -172,11 +234,11 @@ fn verify_dynamic_index( /// - Ok(false) if there were verification errors /// - Err(_) if task was aborted pub fn verify_backup_dir( - datastore: &DataStore, + datastore: Arc, backup_dir: &BackupDir, - verified_chunks: &mut HashSet<[u8;32]>, - corrupt_chunks: &mut HashSet<[u8;32]>, - worker: &WorkerTask + verified_chunks: Arc>>, + corrupt_chunks: Arc>>, + worker: Arc ) -> Result { let mut manifest = match datastore.load_manifest(&backup_dir) { @@ -198,23 +260,23 @@ pub fn verify_backup_dir( match archive_type(&info.filename)? { ArchiveType::FixedIndex => verify_fixed_index( - &datastore, + datastore.clone(), &backup_dir, info, - verified_chunks, - corrupt_chunks, - worker + verified_chunks.clone(), + corrupt_chunks.clone(), + worker.clone(), ), ArchiveType::DynamicIndex => verify_dynamic_index( - &datastore, + datastore.clone(), &backup_dir, info, - verified_chunks, - corrupt_chunks, - worker + verified_chunks.clone(), + corrupt_chunks.clone(), + worker.clone(), ), - ArchiveType::Blob => verify_blob(&datastore, &backup_dir, info), + ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info), } }); @@ -247,7 +309,7 @@ pub fn verify_backup_dir( /// Returns /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &WorkerTask) -> Result, Error> { +pub fn verify_backup_group(datastore: Arc, group: &BackupGroup, worker: Arc) -> Result, Error> { let mut errors = Vec::new(); let mut list = match group.list_backups(&datastore.base_path()) { @@ -260,12 +322,15 @@ pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: & worker.log(format!("verify group {}:{}", datastore.name(), group)); - let mut verified_chunks = HashSet::with_capacity(1024*16); // start with 16384 chunks (up to 65GB) - let mut corrupt_chunks = HashSet::with_capacity(64); // start with 64 chunks since we assume there are few corrupt ones + // start with 16384 chunks (up to 65GB) + let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); + + // start with 64 chunks since we assume there are few corrupt ones + let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); BackupInfo::sort_list(&mut list, false); // newest first for info in list { - if !verify_backup_dir(datastore, &info.backup_dir, &mut verified_chunks, &mut corrupt_chunks, worker)?{ + if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ errors.push(info.backup_dir.to_string()); } } @@ -280,7 +345,7 @@ pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: & /// Returns /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result, Error> { +pub fn verify_all_backups(datastore: Arc, worker: Arc) -> Result, Error> { let mut errors = Vec::new(); @@ -297,7 +362,7 @@ pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result< worker.log(format!("verify datastore {}", datastore.name())); for group in list { - let mut group_errors = verify_backup_group(datastore, &group, worker)?; + let mut group_errors = verify_backup_group(datastore.clone(), &group, worker.clone())?; errors.append(&mut group_errors); }