diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs new file mode 100644 index 00000000..28b71d57 --- /dev/null +++ b/pbs-datastore/src/dynamic_index.rs @@ -0,0 +1,508 @@ +use std::fs::File; +use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{bail, format_err, Error}; + +use proxmox::tools::io::ReadExt; +use proxmox::tools::uuid::Uuid; +use proxmox::tools::mmap::Mmap; + +use pbs_tools::process_locker::ProcessLockSharedGuard; + +use crate::Chunker; +use crate::chunk_stat::ChunkStat; +use crate::chunk_store::ChunkStore; +use crate::data_blob::{DataBlob, DataChunkBuilder}; +use crate::file_formats; +use crate::index::{IndexFile, ChunkReadInfo}; + +/// Header format definition for dynamic index files (`.dixd`) +#[repr(C)] +pub struct DynamicIndexHeader { + pub magic: [u8; 8], + pub uuid: [u8; 16], + pub ctime: i64, + /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)`` + pub index_csum: [u8; 32], + reserved: [u8; 4032], // overall size is one page (4096 bytes) +} +proxmox::static_assert_size!(DynamicIndexHeader, 4096); +// TODO: Once non-Copy unions are stabilized, use: +// union DynamicIndexHeader { +// reserved: [u8; 4096], +// pub data: DynamicIndexHeaderData, +// } + +impl DynamicIndexHeader { + /// Convenience method to allocate a zero-initialized header struct. + pub fn zeroed() -> Box { + unsafe { + Box::from_raw(std::alloc::alloc_zeroed(std::alloc::Layout::new::()) as *mut Self) + } + } + + pub fn as_bytes(&self) -> &[u8] { + unsafe { + std::slice::from_raw_parts( + self as *const Self as *const u8, + std::mem::size_of::(), + ) + } + } +} + +#[derive(Clone, Debug)] +#[repr(C)] +pub struct DynamicEntry { + end_le: u64, + digest: [u8; 32], +} + +impl DynamicEntry { + #[inline] + pub fn end(&self) -> u64 { + u64::from_le(self.end_le) + } +} + +pub struct DynamicIndexReader { + _file: File, + pub size: usize, + index: Mmap, + pub uuid: [u8; 16], + pub ctime: i64, + pub index_csum: [u8; 32], +} + +impl DynamicIndexReader { + pub fn open(path: &Path) -> Result { + File::open(path) + .map_err(Error::from) + .and_then(Self::new) + .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err)) + } + + pub fn index(&self) -> &[DynamicEntry] { + &self.index + } + + pub fn new(mut file: std::fs::File) -> Result { + // FIXME: This is NOT OUR job! Check the callers of this method and remove this! + file.seek(SeekFrom::Start(0))?; + + let header_size = std::mem::size_of::(); + + let rawfd = file.as_raw_fd(); + let stat = match nix::sys::stat::fstat(rawfd) { + Ok(stat) => stat, + Err(err) => bail!("fstat failed - {}", err), + }; + + let size = stat.st_size as usize; + + if size < header_size { + bail!("index too small ({})", stat.st_size); + } + + let header: Box = unsafe { file.read_host_value_boxed()? }; + + if header.magic != file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { + bail!("got unknown magic number"); + } + + let ctime = proxmox::tools::time::epoch_i64(); + + let index_size = stat.st_size as usize - header_size; + let index_count = index_size / 40; + if index_count * 40 != index_size { + bail!("got unexpected file size"); + } + + let index = unsafe { + Mmap::map_fd( + rawfd, + header_size as u64, + index_count, + nix::sys::mman::ProtFlags::PROT_READ, + nix::sys::mman::MapFlags::MAP_PRIVATE, + )? + }; + + Ok(Self { + _file: file, + size, + index, + ctime, + uuid: header.uuid, + index_csum: header.index_csum, + }) + } + + #[inline] + #[allow(clippy::cast_ptr_alignment)] + pub fn chunk_end(&self, pos: usize) -> u64 { + if pos >= self.index.len() { + panic!("chunk index out of range"); + } + self.index[pos].end() + } + + #[inline] + fn chunk_digest(&self, pos: usize) -> &[u8; 32] { + if pos >= self.index.len() { + panic!("chunk index out of range"); + } + &self.index[pos].digest + } + + pub fn binary_search( + &self, + start_idx: usize, + start: u64, + end_idx: usize, + end: u64, + offset: u64, + ) -> Result { + if (offset >= end) || (offset < start) { + bail!("offset out of range"); + } + + if end_idx == start_idx { + return Ok(start_idx); // found + } + let middle_idx = (start_idx + end_idx) / 2; + let middle_end = self.chunk_end(middle_idx); + + if offset < middle_end { + self.binary_search(start_idx, start, middle_idx, middle_end, offset) + } else { + self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset) + } + } +} + +impl IndexFile for DynamicIndexReader { + fn index_count(&self) -> usize { + self.index.len() + } + + fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { + if pos >= self.index.len() { + None + } else { + Some(unsafe { &*(self.chunk_digest(pos).as_ptr() as *const [u8; 32]) }) + } + } + + fn index_bytes(&self) -> u64 { + if self.index.is_empty() { + 0 + } else { + self.chunk_end(self.index.len() - 1) + } + } + + fn compute_csum(&self) -> ([u8; 32], u64) { + let mut csum = openssl::sha::Sha256::new(); + let mut chunk_end = 0; + for pos in 0..self.index_count() { + let info = self.chunk_info(pos).unwrap(); + chunk_end = info.range.end; + csum.update(&chunk_end.to_le_bytes()); + csum.update(&info.digest); + } + let csum = csum.finish(); + (csum, chunk_end) + } + + fn chunk_info(&self, pos: usize) -> Option { + if pos >= self.index.len() { + return None; + } + let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; + + let end = self.index[pos].end(); + + Some(ChunkReadInfo { + range: start..end, + digest: self.index[pos].digest, + }) + } + + fn index_ctime(&self) -> i64 { + self.ctime + } + + fn index_size(&self) -> usize { + self.size as usize + } + + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> { + let end_idx = self.index.len() - 1; + let end = self.chunk_end(end_idx); + let found_idx = self.binary_search(0, 0, end_idx, end, offset); + let found_idx = match found_idx { + Ok(i) => i, + Err(_) => return None + }; + + let found_start = if found_idx == 0 { + 0 + } else { + self.chunk_end(found_idx - 1) + }; + + Some((found_idx, offset - found_start)) + } +} + +/// Create dynamic index files (`.dixd`) +pub struct DynamicIndexWriter { + store: Arc, + _lock: ProcessLockSharedGuard, + writer: BufWriter, + closed: bool, + filename: PathBuf, + tmp_filename: PathBuf, + csum: Option, + pub uuid: [u8; 16], + pub ctime: i64, +} + +impl Drop for DynamicIndexWriter { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors + } +} + +impl DynamicIndexWriter { + pub fn create(store: Arc, path: &Path) -> Result { + let shared_lock = store.try_shared_lock()?; + + let full_path = store.relative_path(path); + let mut tmp_path = full_path.clone(); + tmp_path.set_extension("tmp_didx"); + + let file = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&tmp_path)?; + + let mut writer = BufWriter::with_capacity(1024 * 1024, file); + + let ctime = proxmox::tools::time::epoch_i64(); + + let uuid = Uuid::generate(); + + let mut header = DynamicIndexHeader::zeroed(); + header.magic = file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0; + header.ctime = i64::to_le(ctime); + header.uuid = *uuid.as_bytes(); + // header.index_csum = [0u8; 32]; + writer.write_all(header.as_bytes())?; + + let csum = Some(openssl::sha::Sha256::new()); + + Ok(Self { + store, + _lock: shared_lock, + writer, + closed: false, + filename: full_path, + tmp_filename: tmp_path, + ctime, + uuid: *uuid.as_bytes(), + csum, + }) + } + + // fixme: use add_chunk instead? + pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { + self.store.insert_chunk(chunk, digest) + } + + pub fn close(&mut self) -> Result<[u8; 32], Error> { + if self.closed { + bail!( + "cannot close already closed archive index file {:?}", + self.filename + ); + } + + self.closed = true; + + self.writer.flush()?; + + let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum); + self.writer.seek(SeekFrom::Start(csum_offset as u64))?; + + let csum = self.csum.take().unwrap(); + let index_csum = csum.finish(); + + self.writer.write_all(&index_csum)?; + self.writer.flush()?; + + if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { + bail!("Atomic rename file {:?} failed - {}", self.filename, err); + } + + Ok(index_csum) + } + + // fixme: rename to add_digest + pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { + if self.closed { + bail!( + "cannot write to closed dynamic index file {:?}", + self.filename + ); + } + + let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::(offset.to_le()) }; + + if let Some(ref mut csum) = self.csum { + csum.update(offset_le); + csum.update(digest); + } + + self.writer.write_all(offset_le)?; + self.writer.write_all(digest)?; + Ok(()) + } +} + +/// Writer which splits a binary stream into dynamic sized chunks +/// +/// And store the resulting chunk list into the index file. +pub struct DynamicChunkWriter { + index: DynamicIndexWriter, + closed: bool, + chunker: Chunker, + stat: ChunkStat, + chunk_offset: usize, + last_chunk: usize, + chunk_buffer: Vec, +} + +impl DynamicChunkWriter { + pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { + Self { + index, + closed: false, + chunker: Chunker::new(chunk_size), + stat: ChunkStat::new(0), + chunk_offset: 0, + last_chunk: 0, + chunk_buffer: Vec::with_capacity(chunk_size * 4), + } + } + + pub fn stat(&self) -> &ChunkStat { + &self.stat + } + + pub fn close(&mut self) -> Result<(), Error> { + if self.closed { + return Ok(()); + } + + self.closed = true; + + self.write_chunk_buffer()?; + + self.index.close()?; + + self.stat.size = self.chunk_offset as u64; + + // add size of index file + self.stat.size += + (self.stat.chunk_count * 40 + std::mem::size_of::()) as u64; + + Ok(()) + } + + fn write_chunk_buffer(&mut self) -> Result<(), Error> { + let chunk_size = self.chunk_buffer.len(); + + if chunk_size == 0 { + return Ok(()); + } + + let expected_chunk_size = self.chunk_offset - self.last_chunk; + if expected_chunk_size != self.chunk_buffer.len() { + bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); + } + + self.stat.chunk_count += 1; + + self.last_chunk = self.chunk_offset; + + let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer) + .compress(true) + .build()?; + + match self.index.insert_chunk(&chunk, &digest) { + Ok((is_duplicate, compressed_size)) => { + self.stat.compressed_size += compressed_size; + if is_duplicate { + self.stat.duplicate_chunks += 1; + } else { + self.stat.disk_size += compressed_size; + } + + println!( + "ADD CHUNK {:016x} {} {}% {} {}", + self.chunk_offset, + chunk_size, + (compressed_size * 100) / (chunk_size as u64), + is_duplicate, + proxmox::tools::digest_to_hex(&digest) + ); + self.index.add_chunk(self.chunk_offset as u64, &digest)?; + self.chunk_buffer.truncate(0); + Ok(()) + } + Err(err) => { + self.chunk_buffer.truncate(0); + Err(err) + } + } + } +} + +impl Write for DynamicChunkWriter { + fn write(&mut self, data: &[u8]) -> std::result::Result { + let chunker = &mut self.chunker; + + let pos = chunker.scan(data); + + if pos > 0 { + self.chunk_buffer.extend_from_slice(&data[0..pos]); + self.chunk_offset += pos; + + if let Err(err) = self.write_chunk_buffer() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + )); + } + Ok(pos) + } else { + self.chunk_offset += data.len(); + self.chunk_buffer.extend_from_slice(data); + Ok(data.len()) + } + } + + fn flush(&mut self) -> std::result::Result<(), std::io::Error> { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "please use close() instead of flush()", + )) + } +} diff --git a/src/backup/fixed_index.rs b/pbs-datastore/src/fixed_index.rs similarity index 97% rename from src/backup/fixed_index.rs rename to pbs-datastore/src/fixed_index.rs index 518a5b0d..47c69043 100644 --- a/src/backup/fixed_index.rs +++ b/pbs-datastore/src/fixed_index.rs @@ -7,15 +7,17 @@ use std::io::{Seek, SeekFrom}; use anyhow::{bail, format_err, Error}; -use pbs_datastore::chunk_stat::ChunkStat; -use pbs_datastore::chunk_store::ChunkStore; -use pbs_datastore::data_blob::ChunkInfo; -use pbs_datastore::index::{ChunkReadInfo, IndexFile}; use pbs_tools::process_locker::ProcessLockSharedGuard; use proxmox::tools::io::ReadExt; use proxmox::tools::Uuid; +use crate::chunk_stat::ChunkStat; +use crate::chunk_store::ChunkStore; +use crate::data_blob::ChunkInfo; +use crate::file_formats; +use crate::index::{ChunkReadInfo, IndexFile}; + /// Header format definition for fixed index files (`.fidx`) #[repr(C)] pub struct FixedIndexHeader { @@ -81,7 +83,7 @@ impl FixedIndexReader { let header: Box = unsafe { file.read_host_value_boxed()? }; - if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 { + if header.magic != file_formats::FIXED_SIZED_CHUNK_INDEX_1_0 { bail!("got unknown magic number"); } @@ -286,7 +288,7 @@ impl FixedIndexWriter { let buffer = vec![0u8; header_size]; let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) }; - header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0; + header.magic = file_formats::FIXED_SIZED_CHUNK_INDEX_1_0; header.ctime = i64::to_le(ctime); header.size = u64::to_le(size as u64); header.chunk_size = u64::to_le(chunk_size as u64); diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index 3a5b15a9..f0b9f57d 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -195,9 +195,13 @@ pub mod file_formats; pub mod index; pub mod key_derivation; pub mod manifest; +pub mod prune; pub mod read_chunk; pub mod task; +pub mod dynamic_index; +pub mod fixed_index; + pub use backup_info::{BackupDir, BackupGroup, BackupInfo}; pub use checksum_reader::ChecksumReader; pub use checksum_writer::ChecksumWriter; diff --git a/src/backup/prune.rs b/pbs-datastore/src/prune.rs similarity index 99% rename from src/backup/prune.rs rename to pbs-datastore/src/prune.rs index dd038055..d0d8ca16 100644 --- a/src/backup/prune.rs +++ b/pbs-datastore/src/prune.rs @@ -1,7 +1,8 @@ -use anyhow::{Error}; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; +use anyhow::{Error}; + use super::BackupInfo; enum PruneMark { Keep, KeepPartial, Remove } diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index 412e9f88..d47c412b 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -12,22 +12,26 @@ use lazy_static::lazy_static; use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions, open_file_locked}; +use pbs_api_types::upid::UPID; +use pbs_api_types::{Authid, GarbageCollectionStatus}; use pbs_datastore::{task_log, task_warn}; +use pbs_datastore::DataBlob; +use pbs_datastore::backup_info::{BackupGroup, BackupDir}; +use pbs_datastore::chunk_store::ChunkStore; +use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; +use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter}; +use pbs_datastore::index::IndexFile; +use pbs_datastore::manifest::{ + MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, + ArchiveType, BackupManifest, + archive_type, +}; use pbs_datastore::task::TaskState; use pbs_tools::format::HumanByte; use pbs_tools::fs::{lock_dir_noblock, DirLockGuard}; -use super::backup_info::{BackupGroup, BackupDir}; -use super::chunk_store::ChunkStore; -use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; -use super::fixed_index::{FixedIndexReader, FixedIndexWriter}; -use super::manifest::{MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}; -use super::index::*; -use super::{DataBlob, ArchiveType, archive_type}; use crate::config::datastore::{self, DataStoreConfig}; use crate::tools; -use crate::api2::types::{Authid, GarbageCollectionStatus}; -use crate::server::UPID; lazy_static! { static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs index 4bba6a29..f13d1391 100644 --- a/src/backup/dynamic_index.rs +++ b/src/backup/dynamic_index.rs @@ -1,263 +1,16 @@ -use std::fs::File; -use std::io::{self, BufWriter, Seek, SeekFrom, Write}; +use std::io::{self, Seek, SeekFrom}; use std::ops::Range; -use std::os::unix::io::AsRawFd; -use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::task::Context; use std::pin::Pin; use anyhow::{bail, format_err, Error}; -use proxmox::tools::io::ReadExt; -use proxmox::tools::uuid::Uuid; -use proxmox::tools::mmap::Mmap; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; -use pbs_datastore::Chunker; -use pbs_datastore::index::{IndexFile, ChunkReadInfo}; -use pbs_datastore::chunk_stat::ChunkStat; -use pbs_datastore::data_blob::{DataBlob, DataChunkBuilder}; -use pbs_datastore::chunk_store::ChunkStore; +use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::read_chunk::ReadChunk; -use pbs_tools::process_locker::ProcessLockSharedGuard; - -/// Header format definition for dynamic index files (`.dixd`) -#[repr(C)] -pub struct DynamicIndexHeader { - pub magic: [u8; 8], - pub uuid: [u8; 16], - pub ctime: i64, - /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)`` - pub index_csum: [u8; 32], - reserved: [u8; 4032], // overall size is one page (4096 bytes) -} -proxmox::static_assert_size!(DynamicIndexHeader, 4096); -// TODO: Once non-Copy unions are stabilized, use: -// union DynamicIndexHeader { -// reserved: [u8; 4096], -// pub data: DynamicIndexHeaderData, -// } - -impl DynamicIndexHeader { - /// Convenience method to allocate a zero-initialized header struct. - pub fn zeroed() -> Box { - unsafe { - Box::from_raw(std::alloc::alloc_zeroed(std::alloc::Layout::new::()) as *mut Self) - } - } - - pub fn as_bytes(&self) -> &[u8] { - unsafe { - std::slice::from_raw_parts( - self as *const Self as *const u8, - std::mem::size_of::(), - ) - } - } -} - -#[derive(Clone, Debug)] -#[repr(C)] -pub struct DynamicEntry { - end_le: u64, - digest: [u8; 32], -} - -impl DynamicEntry { - #[inline] - pub fn end(&self) -> u64 { - u64::from_le(self.end_le) - } -} - -pub struct DynamicIndexReader { - _file: File, - pub size: usize, - index: Mmap, - pub uuid: [u8; 16], - pub ctime: i64, - pub index_csum: [u8; 32], -} - -impl DynamicIndexReader { - pub fn open(path: &Path) -> Result { - File::open(path) - .map_err(Error::from) - .and_then(Self::new) - .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err)) - } - - pub fn new(mut file: std::fs::File) -> Result { - // FIXME: This is NOT OUR job! Check the callers of this method and remove this! - file.seek(SeekFrom::Start(0))?; - - let header_size = std::mem::size_of::(); - - let rawfd = file.as_raw_fd(); - let stat = match nix::sys::stat::fstat(rawfd) { - Ok(stat) => stat, - Err(err) => bail!("fstat failed - {}", err), - }; - - let size = stat.st_size as usize; - - if size < header_size { - bail!("index too small ({})", stat.st_size); - } - - let header: Box = unsafe { file.read_host_value_boxed()? }; - - if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { - bail!("got unknown magic number"); - } - - let ctime = proxmox::tools::time::epoch_i64(); - - let index_size = stat.st_size as usize - header_size; - let index_count = index_size / 40; - if index_count * 40 != index_size { - bail!("got unexpected file size"); - } - - let index = unsafe { - Mmap::map_fd( - rawfd, - header_size as u64, - index_count, - nix::sys::mman::ProtFlags::PROT_READ, - nix::sys::mman::MapFlags::MAP_PRIVATE, - )? - }; - - Ok(Self { - _file: file, - size, - index, - ctime, - uuid: header.uuid, - index_csum: header.index_csum, - }) - } - - #[inline] - #[allow(clippy::cast_ptr_alignment)] - fn chunk_end(&self, pos: usize) -> u64 { - if pos >= self.index.len() { - panic!("chunk index out of range"); - } - self.index[pos].end() - } - - #[inline] - fn chunk_digest(&self, pos: usize) -> &[u8; 32] { - if pos >= self.index.len() { - panic!("chunk index out of range"); - } - &self.index[pos].digest - } - - // TODO: can we use std::slice::binary_search with Mmap now? - fn binary_search( - &self, - start_idx: usize, - start: u64, - end_idx: usize, - end: u64, - offset: u64, - ) -> Result { - if (offset >= end) || (offset < start) { - bail!("offset out of range"); - } - - if end_idx == start_idx { - return Ok(start_idx); // found - } - let middle_idx = (start_idx + end_idx) / 2; - let middle_end = self.chunk_end(middle_idx); - - if offset < middle_end { - self.binary_search(start_idx, start, middle_idx, middle_end, offset) - } else { - self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset) - } - } -} - -impl IndexFile for DynamicIndexReader { - fn index_count(&self) -> usize { - self.index.len() - } - - fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { - if pos >= self.index.len() { - None - } else { - Some(unsafe { &*(self.chunk_digest(pos).as_ptr() as *const [u8; 32]) }) - } - } - - fn index_bytes(&self) -> u64 { - if self.index.is_empty() { - 0 - } else { - self.chunk_end(self.index.len() - 1) - } - } - - fn compute_csum(&self) -> ([u8; 32], u64) { - let mut csum = openssl::sha::Sha256::new(); - let mut chunk_end = 0; - for pos in 0..self.index_count() { - let info = self.chunk_info(pos).unwrap(); - chunk_end = info.range.end; - csum.update(&chunk_end.to_le_bytes()); - csum.update(&info.digest); - } - let csum = csum.finish(); - (csum, chunk_end) - } - - fn chunk_info(&self, pos: usize) -> Option { - if pos >= self.index.len() { - return None; - } - let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; - - let end = self.index[pos].end(); - - Some(ChunkReadInfo { - range: start..end, - digest: self.index[pos].digest, - }) - } - - fn index_ctime(&self) -> i64 { - self.ctime - } - - fn index_size(&self) -> usize { - self.size as usize - } - - fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> { - let end_idx = self.index.len() - 1; - let end = self.chunk_end(end_idx); - let found_idx = self.binary_search(0, 0, end_idx, end, offset); - let found_idx = match found_idx { - Ok(i) => i, - Err(_) => return None - }; - - let found_start = if found_idx == 0 { - 0 - } else { - self.chunk_end(found_idx - 1) - }; - - Some((found_idx, offset - found_start)) - } -} +use pbs_datastore::index::IndexFile; struct CachedChunk { range: Range, @@ -358,7 +111,7 @@ impl crate::tools::BufferedRead for BufferedDynamicReader { // optimization for sequential read if buffer_len > 0 - && ((self.buffered_chunk_idx + 1) < index.index.len()) + && ((self.buffered_chunk_idx + 1) < index.index().len()) && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) { let next_idx = self.buffered_chunk_idx + 1; @@ -374,7 +127,7 @@ impl crate::tools::BufferedRead for BufferedDynamicReader { || (offset < self.buffered_chunk_start) || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) { - let end_idx = index.index.len() - 1; + let end_idx = index.index().len() - 1; let end = index.chunk_end(end_idx); let idx = index.binary_search(0, 0, end_idx, end, offset)?; self.buffer_chunk(idx)?; @@ -474,252 +227,3 @@ impl ReadAt for LocalDynamicReadAt { panic!("LocalDynamicReadAt::start_read_at returned Pending"); } } - - -/// Create dynamic index files (`.dixd`) -pub struct DynamicIndexWriter { - store: Arc, - _lock: ProcessLockSharedGuard, - writer: BufWriter, - closed: bool, - filename: PathBuf, - tmp_filename: PathBuf, - csum: Option, - pub uuid: [u8; 16], - pub ctime: i64, -} - -impl Drop for DynamicIndexWriter { - fn drop(&mut self) { - let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors - } -} - -impl DynamicIndexWriter { - pub fn create(store: Arc, path: &Path) -> Result { - let shared_lock = store.try_shared_lock()?; - - let full_path = store.relative_path(path); - let mut tmp_path = full_path.clone(); - tmp_path.set_extension("tmp_didx"); - - let file = std::fs::OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(&tmp_path)?; - - let mut writer = BufWriter::with_capacity(1024 * 1024, file); - - let ctime = proxmox::tools::time::epoch_i64(); - - let uuid = Uuid::generate(); - - let mut header = DynamicIndexHeader::zeroed(); - header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0; - header.ctime = i64::to_le(ctime); - header.uuid = *uuid.as_bytes(); - // header.index_csum = [0u8; 32]; - writer.write_all(header.as_bytes())?; - - let csum = Some(openssl::sha::Sha256::new()); - - Ok(Self { - store, - _lock: shared_lock, - writer, - closed: false, - filename: full_path, - tmp_filename: tmp_path, - ctime, - uuid: *uuid.as_bytes(), - csum, - }) - } - - // fixme: use add_chunk instead? - pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { - self.store.insert_chunk(chunk, digest) - } - - pub fn close(&mut self) -> Result<[u8; 32], Error> { - if self.closed { - bail!( - "cannot close already closed archive index file {:?}", - self.filename - ); - } - - self.closed = true; - - self.writer.flush()?; - - let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum); - self.writer.seek(SeekFrom::Start(csum_offset as u64))?; - - let csum = self.csum.take().unwrap(); - let index_csum = csum.finish(); - - self.writer.write_all(&index_csum)?; - self.writer.flush()?; - - if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { - bail!("Atomic rename file {:?} failed - {}", self.filename, err); - } - - Ok(index_csum) - } - - // fixme: rename to add_digest - pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { - if self.closed { - bail!( - "cannot write to closed dynamic index file {:?}", - self.filename - ); - } - - let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::(offset.to_le()) }; - - if let Some(ref mut csum) = self.csum { - csum.update(offset_le); - csum.update(digest); - } - - self.writer.write_all(offset_le)?; - self.writer.write_all(digest)?; - Ok(()) - } -} - -/// Writer which splits a binary stream into dynamic sized chunks -/// -/// And store the resulting chunk list into the index file. -pub struct DynamicChunkWriter { - index: DynamicIndexWriter, - closed: bool, - chunker: Chunker, - stat: ChunkStat, - chunk_offset: usize, - last_chunk: usize, - chunk_buffer: Vec, -} - -impl DynamicChunkWriter { - pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { - Self { - index, - closed: false, - chunker: Chunker::new(chunk_size), - stat: ChunkStat::new(0), - chunk_offset: 0, - last_chunk: 0, - chunk_buffer: Vec::with_capacity(chunk_size * 4), - } - } - - pub fn stat(&self) -> &ChunkStat { - &self.stat - } - - pub fn close(&mut self) -> Result<(), Error> { - if self.closed { - return Ok(()); - } - - self.closed = true; - - self.write_chunk_buffer()?; - - self.index.close()?; - - self.stat.size = self.chunk_offset as u64; - - // add size of index file - self.stat.size += - (self.stat.chunk_count * 40 + std::mem::size_of::()) as u64; - - Ok(()) - } - - fn write_chunk_buffer(&mut self) -> Result<(), Error> { - let chunk_size = self.chunk_buffer.len(); - - if chunk_size == 0 { - return Ok(()); - } - - let expected_chunk_size = self.chunk_offset - self.last_chunk; - if expected_chunk_size != self.chunk_buffer.len() { - bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); - } - - self.stat.chunk_count += 1; - - self.last_chunk = self.chunk_offset; - - let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer) - .compress(true) - .build()?; - - match self.index.insert_chunk(&chunk, &digest) { - Ok((is_duplicate, compressed_size)) => { - self.stat.compressed_size += compressed_size; - if is_duplicate { - self.stat.duplicate_chunks += 1; - } else { - self.stat.disk_size += compressed_size; - } - - println!( - "ADD CHUNK {:016x} {} {}% {} {}", - self.chunk_offset, - chunk_size, - (compressed_size * 100) / (chunk_size as u64), - is_duplicate, - proxmox::tools::digest_to_hex(&digest) - ); - self.index.add_chunk(self.chunk_offset as u64, &digest)?; - self.chunk_buffer.truncate(0); - Ok(()) - } - Err(err) => { - self.chunk_buffer.truncate(0); - Err(err) - } - } - } -} - -impl Write for DynamicChunkWriter { - fn write(&mut self, data: &[u8]) -> std::result::Result { - let chunker = &mut self.chunker; - - let pos = chunker.scan(data); - - if pos > 0 { - self.chunk_buffer.extend_from_slice(&data[0..pos]); - self.chunk_offset += pos; - - if let Err(err) = self.write_chunk_buffer() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - err.to_string(), - )); - } - Ok(pos) - } else { - self.chunk_offset += data.len(); - self.chunk_buffer.extend_from_slice(data); - Ok(data.len()) - } - } - - fn flush(&mut self) -> std::result::Result<(), std::io::Error> { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "please use close() instead of flush()", - )) - } -} diff --git a/src/backup/mod.rs b/src/backup/mod.rs index 5b597265..b5c8867b 100644 --- a/src/backup/mod.rs +++ b/src/backup/mod.rs @@ -1,146 +1,4 @@ -//! This module implements the data storage and access layer. -//! -//! # Data formats -//! -//! PBS splits large files into chunks, and stores them deduplicated using -//! a content addressable storage format. -//! -//! Backup snapshots are stored as folders containing a manifest file and -//! potentially one or more index or blob files. -//! -//! The manifest contains hashes of all other files and can be signed by -//! the client. -//! -//! Blob files contain data directly. They are used for config files and -//! the like. -//! -//! Index files are used to reconstruct an original file. They contain a -//! list of SHA256 checksums. The `DynamicIndex*` format is able to deal -//! with dynamic chunk sizes (CT and host backups), whereas the -//! `FixedIndex*` format is an optimization to store a list of equal sized -//! chunks (VMs, whole block devices). -//! -//! A chunk is defined as a binary blob, which is stored inside a -//! [ChunkStore](struct.ChunkStore.html) instead of the backup directory -//! directly, and can be addressed by its SHA256 digest. -//! -//! -//! # Garbage Collection (GC) -//! -//! Deleting backups is as easy as deleting the corresponding .idx files. -//! However, this does not free up any storage, because those files just -//! contain references to chunks. -//! -//! To free up some storage, we run a garbage collection process at -//! regular intervals. The collector uses a mark and sweep approach. In -//! the first phase, it scans all .idx files to mark used chunks. The -//! second phase then removes all unmarked chunks from the store. -//! -//! The locking mechanisms mentioned below make sure that we are the only -//! process running GC. We still want to be able to create backups during -//! GC, so there may be multiple backup threads/tasks running, either -//! started before GC, or while GC is running. -//! -//! ## `atime` based GC -//! -//! The idea here is to mark chunks by updating the `atime` (access -//! timestamp) on the chunk file. This is quite simple and does not need -//! additional RAM. -//! -//! One minor problem is that recent Linux versions use the `relatime` -//! mount flag by default for performance reasons (and we want that). When -//! enabled, `atime` data is written to the disk only if the file has been -//! modified since the `atime` data was last updated (`mtime`), or if the -//! file was last accessed more than a certain amount of time ago (by -//! default 24h). So we may only delete chunks with `atime` older than 24 -//! hours. -//! -//! Another problem arises from running backups. The mark phase does not -//! find any chunks from those backups, because there is no .idx file for -//! them (created after the backup). Chunks created or touched by those -//! backups may have an `atime` as old as the start time of those backups. -//! Please note that the backup start time may predate the GC start time. -//! So we may only delete chunks older than the start time of those -//! running backup jobs, which might be more than 24h back (this is the -//! reason why ProcessLocker exclusive locks only have to be exclusive -//! between processes, since within one we can determine the age of the -//! oldest shared lock). -//! -//! ## Store `marks` in RAM using a HASH -//! -//! Might be better. Under investigation. -//! -//! -//! # Locking -//! -//! Since PBS allows multiple potentially interfering operations at the -//! same time (e.g. garbage collect, prune, multiple backup creations -//! (only in separate groups), forget, ...), these need to lock against -//! each other in certain scenarios. There is no overarching global lock -//! though, instead always the finest grained lock possible is used, -//! because running these operations concurrently is treated as a feature -//! on its own. -//! -//! ## Inter-process Locking -//! -//! We need to be able to restart the proxmox-backup service daemons, so -//! that we can update the software without rebooting the host. But such -//! restarts must not abort running backup jobs, so we need to keep the -//! old service running until those jobs are finished. This implies that -//! we need some kind of locking for modifying chunks and indices in the -//! ChunkStore. -//! -//! Please note that it is perfectly valid to have multiple -//! parallel ChunkStore writers, even when they write the same chunk -//! (because the chunk would have the same name and the same data, and -//! writes are completed atomically via a rename). The only problem is -//! garbage collection, because we need to avoid deleting chunks which are -//! still referenced. -//! -//! To do this we use the -//! [ProcessLocker](../tools/struct.ProcessLocker.html). -//! -//! ### ChunkStore-wide -//! -//! * Create Index Files: -//! -//! Acquire shared lock for ChunkStore. -//! -//! Note: When creating .idx files, we create a temporary .tmp file, -//! then do an atomic rename. -//! -//! * Garbage Collect: -//! -//! Acquire exclusive lock for ChunkStore. If we have -//! already a shared lock for the ChunkStore, try to upgrade that -//! lock. -//! -//! Exclusive locks only work _between processes_. It is valid to have an -//! exclusive and one or more shared locks held within one process. Writing -//! chunks within one process is synchronized using the gc_mutex. -//! -//! On server restart, we stop any running GC in the old process to avoid -//! having the exclusive lock held for too long. -//! -//! ## Locking table -//! -//! Below table shows all operations that play a role in locking, and which -//! mechanisms are used to make their concurrent usage safe. -//! -//! | starting >
v during | read index file | create index file | GC mark | GC sweep | update manifest | forget | prune | create backup | verify | reader api | -//! |-|-|-|-|-|-|-|-|-|-|-| -//! | **read index file** | / | / | / | / | / | mmap stays valid, oldest_shared_lock prevents GC | see forget column | / | / | / | -//! | **create index file** | / | / | / | / | / | / | / | /, happens at the end, after all chunks are touched | /, only happens without a manifest | / | -//! | **GC mark** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | /, GC only cares about index files, not manifests | tells GC about removed chunks | see forget column | /, index files don’t exist yet | / | / | -//! | **GC sweep** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | / | /, chunks already marked | see forget column | chunks get touched; chunk_store.mutex; oldest PL lock | / | / | -//! | **update manifest** | / | / | / | / | update_manifest lock | update_manifest lock, remove dir under lock | see forget column | /, “write manifest” happens at the end | /, can call “write manifest”, see that column | / | -//! | **forget** | / | / | removed_during_gc mutex is held during unlink | marking done, doesn’t matter if forgotten now | update_manifest lock, forget waits for lock | /, unlink is atomic | causes forget to fail, but that’s OK | running backup has snapshot flock | /, potentially detects missing folder | shared snap flock | -//! | **prune** | / | / | see forget row | see forget row | see forget row | causes warn in prune, but no error | see forget column | running and last non-running can’t be pruned | see forget row | shared snap flock | -//! | **create backup** | / | only time this happens, thus has snapshot flock | / | chunks get touched; chunk_store.mutex; oldest PL lock | no lock, but cannot exist beforehand | snapshot flock, can’t be forgotten | running and last non-running can’t be pruned | snapshot group flock, only one running per group | /, won’t be verified since manifest missing | / | -//! | **verify** | / | / | / | / | see “update manifest” row | /, potentially detects missing folder | see forget column | / | /, but useless (“update manifest” protects itself) | / | -//! | **reader api** | / | / | / | /, open snap can’t be forgotten, so ref must exist | / | prevented by shared snap flock | prevented by shared snap flock | / | / | /, lock is shared |! -//! * / = no interaction -//! * shared/exclusive from POV of 'starting' process +//! Server/client-specific parts for what's otherwise in pbs-datastore. use anyhow::{bail, Error}; @@ -216,6 +74,13 @@ pub use pbs_datastore::key_derivation; pub use pbs_datastore::key_derivation::*; pub use pbs_datastore::manifest; pub use pbs_datastore::manifest::*; +pub use pbs_datastore::prune; +pub use pbs_datastore::prune::*; + +pub use pbs_datastore::dynamic_index::*; +pub use pbs_datastore::fixed_index; +pub use pbs_datastore::fixed_index::*; + pub use pbs_datastore::read_chunk::*; mod chunk_stream; @@ -225,15 +90,10 @@ pub use chunk_stream::*; mod read_chunk; pub use read_chunk::*; -mod fixed_index; -pub use fixed_index::*; - +// Split mod dynamic_index; pub use dynamic_index::*; -mod prune; -pub use prune::*; - mod datastore; pub use datastore::*;