diff --git a/src/tape/mod.rs b/src/tape/mod.rs index e4eeb3bd..f9309ec0 100644 --- a/src/tape/mod.rs +++ b/src/tape/mod.rs @@ -45,6 +45,9 @@ pub use chunk_archive::*; mod snapshot_archive; pub use snapshot_archive::*; +mod pool_writer; +pub use pool_writer::*; + /// Directory path where we store all tape status information pub const TAPE_STATUS_DIR: &str = "/var/lib/proxmox-backup/tape"; diff --git a/src/tape/pool_writer.rs b/src/tape/pool_writer.rs new file mode 100644 index 00000000..ce118445 --- /dev/null +++ b/src/tape/pool_writer.rs @@ -0,0 +1,392 @@ +use std::collections::HashSet; +use std::path::Path; + +use anyhow::{bail, Error}; + +use proxmox::{ + tools::Uuid, + api::section_config::SectionConfigData, +}; + +use crate::{ + backup::{ + DataStore, + }, + tape::{ + TAPE_STATUS_DIR, + MAX_CHUNK_ARCHIVE_SIZE, + COMMIT_BLOCK_SIZE, + TapeDriver, + TapeWrite, + ChunkArchiveWriter, + SnapshotReader, + SnapshotChunkIterator, + MediaPool, + MediaId, + MediaCatalog, + MediaSetCatalog, + tape_write_snapshot_archive, + request_and_load_media, + }, +}; + + +struct PoolWriterState { + drive: Box, + catalog: MediaCatalog, + // tell if we already moved to EOM + at_eom: bool, + // bytes written after the last tape fush/sync + bytes_written: usize, +} + +impl PoolWriterState { + + fn commit(&mut self) -> Result<(), Error> { + self.drive.sync()?; // sync all data to the tape + self.catalog.commit()?; // then commit the catalog + self.bytes_written = 0; + Ok(()) + } +} + +/// Helper to manage a backup job, writing several tapes of a pool +pub struct PoolWriter { + pool: MediaPool, + drive_name: String, + status: Option, + media_set_catalog: MediaSetCatalog, +} + +impl PoolWriter { + + pub fn new(mut pool: MediaPool, drive_name: &str) -> Result { + + let current_time = proxmox::tools::time::epoch_i64(); + + pool.start_write_session(current_time)?; + + let mut media_set_catalog = MediaSetCatalog::new(); + + // load all catalogs read-only at start + for media_uuid in pool.current_media_list()? { + let media_catalog = MediaCatalog::open( + Path::new(TAPE_STATUS_DIR), + &media_uuid, + false, + false, + )?; + media_set_catalog.append_catalog(media_catalog)?; + } + + Ok(Self { + pool, + drive_name: drive_name.to_string(), + status: None, + media_set_catalog, + }) + } + + pub fn pool(&mut self) -> &mut MediaPool { + &mut self.pool + } + + /// Set media status to FULL (persistent - stores pool status) + pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> { + self.pool.set_media_status_full(&uuid)?; + Ok(()) + } + + pub fn contains_snapshot(&self, snapshot: &str) -> bool { + if let Some(PoolWriterState { ref catalog, .. }) = self.status { + if catalog.contains_snapshot(snapshot) { + return true; + } + } + self.media_set_catalog.contains_snapshot(snapshot) + } + + /// commit changes to tape and catalog + /// + /// This is done automatically during a backupsession, but needs to + /// be called explicitly before dropping the PoolWriter + pub fn commit(&mut self) -> Result<(), Error> { + if let Some(ref mut status) = self.status { + status.commit()?; + } + Ok(()) + } + + /// Load a writable media into the drive + pub fn load_writable_media(&mut self) -> Result { + let last_media_uuid = match self.status { + Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()), + None => None, + }; + + let current_time = proxmox::tools::time::epoch_i64(); + let media_uuid = self.pool.alloc_writable_media(current_time)?; + + let media = self.pool.lookup_media(&media_uuid).unwrap(); + + let media_changed = match last_media_uuid { + Some(ref last_media_uuid) => last_media_uuid != &media_uuid, + None => true, + }; + + if !media_changed { + return Ok(media_uuid); + } + + // remove read-only catalog (we store a writable version in status) + self.media_set_catalog.remove_catalog(&media_uuid); + + if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() { + self.media_set_catalog.append_catalog(catalog)?; + drive.eject_media()?; + } + + let (drive_config, _digest) = crate::config::drive::config()?; + let (drive, catalog) = drive_load_and_label_media(&drive_config, &self.drive_name, &media.id())?; + self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 }); + + Ok(media_uuid) + } + + /// uuid of currently loaded BackupMedia + pub fn current_media_uuid(&self) -> Result<&Uuid, Error> { + match self.status { + Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()), + None => bail!("PoolWriter - no media loaded"), + } + } + + /// Move to EOM (if not aleady there), then creates a new snapshot + /// archive writing specified files (as .pxar) into it. On + /// success, this return 'Ok(true)' and the media catalog gets + /// updated. + + /// Please note that this may fail when there is not enough space + /// on the media (return value 'Ok(false, _)'). In that case, the + /// archive is marked incomplete, and we do not use it. The caller + /// should mark the media as full and try again using another + /// media. + pub fn append_snapshot_archive( + &mut self, + snapshot_reader: &SnapshotReader, + ) -> Result<(bool, usize), Error> { + + let status = match self.status { + Some(ref mut status) => status, + None => bail!("PoolWriter - no media loaded"), + }; + + if !status.at_eom { + status.drive.move_to_eom()?; + status.at_eom = true; + } + + let current_file_number = status.drive.current_file_number()?; + if current_file_number < 2 { + bail!("got strange file position number from drive ({})", current_file_number); + } + + let (done, bytes_written) = { + let mut writer: Box = status.drive.write_file()?; + + match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { + Some(content_uuid) => { + status.catalog.register_snapshot( + content_uuid, + current_file_number, + &snapshot_reader.snapshot().to_string(), + )?; + (true, writer.bytes_written()) + } + None => (false, writer.bytes_written()), + } + }; + + status.bytes_written += bytes_written; + + let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false }; + + if !done || request_sync { + status.commit()?; + } + + Ok((done, bytes_written)) + } + + /// Move to EOM (if not aleady there), then creates a new chunk + /// archive and writes chunks from 'chunk_iter'. This stops when + /// it detect LEOM or when we reach max archive size + /// (4GB). Written chunks are registered in the media catalog. + pub fn append_chunk_archive( + &mut self, + datastore: &DataStore, + chunk_iter: &mut std::iter::Peekable, + ) -> Result<(bool, usize), Error> { + + let status = match self.status { + Some(ref mut status) => status, + None => bail!("PoolWriter - no media loaded"), + }; + + if !status.at_eom { + status.drive.move_to_eom()?; + status.at_eom = true; + } + + let current_file_number = status.drive.current_file_number()?; + if current_file_number < 2 { + bail!("got strange file position number from drive ({})", current_file_number); + } + let writer = status.drive.write_file()?; + + let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( + writer, + datastore, + chunk_iter, + &self.media_set_catalog, + &status.catalog, + MAX_CHUNK_ARCHIVE_SIZE, + )?; + + status.bytes_written += bytes_written; + + let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false }; + + // register chunks in media_catalog + status.catalog.start_chunk_archive(content_uuid, current_file_number)?; + for digest in saved_chunks { + status.catalog.register_chunk(&digest)?; + } + status.catalog.end_chunk_archive()?; + + if leom || request_sync { + status.commit()?; + } + + Ok((leom, bytes_written)) + } +} + +/// write up to of chunks +fn write_chunk_archive<'a>( + writer: Box, + datastore: &DataStore, + chunk_iter: &mut std::iter::Peekable, + media_set_catalog: &MediaSetCatalog, + media_catalog: &MediaCatalog, + max_size: usize, +) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { + + let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?; + + let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); + + // we want to get the chunk list in correct order + let mut chunk_list: Vec<[u8;32]> = Vec::new(); + + let mut leom = false; + + loop { + let digest = match chunk_iter.next() { + None => break, + Some(digest) => digest?, + }; + if media_catalog.contains_chunk(&digest) + || chunk_index.contains(&digest) + || media_set_catalog.contains_chunk(&digest) + { + continue; + } + + let blob = datastore.load_chunk(&digest)?; + println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size()); + + match writer.try_write_chunk(&digest, &blob) { + Ok(true) => { + chunk_index.insert(digest); + chunk_list.push(digest); + } + Ok(false) => { + leom = true; + break; + } + Err(err) => bail!("write chunk failed - {}", err), + } + + if writer.bytes_written() > max_size { + println!("Chunk Archive max size reached, closing archive"); + break; + } + } + + writer.finish()?; + + Ok((chunk_list, content_uuid, leom, writer.bytes_written())) +} + +// Requests and load 'media' into the drive. Then compare the media +// set label. If the tabe is empty, or the existing set label does not +// match the expected media set, overwrite the media set label. +fn drive_load_and_label_media( + drive_config: &SectionConfigData, + drive_name: &str, + media_id: &MediaId, +) -> Result<(Box, MediaCatalog), Error> { + + let (mut tmp_drive, info) = + request_and_load_media(&drive_config, &drive_name, &media_id.label)?; + + let media_catalog; + + let new_set = match media_id.media_set_label { + None => { + bail!("got media without media set - internal error"); + } + Some(ref set) => set, + }; + + let status_path = Path::new(TAPE_STATUS_DIR); + + match &info.media_set_label { + None => { + println!("wrinting new media set label"); + tmp_drive.write_media_set_label(new_set)?; + + let info = MediaId { + label: info.label, + media_set_label: Some(new_set.clone()), + }; + media_catalog = MediaCatalog::overwrite(status_path, &info, true)?; + } + Some(media_set_label) => { + if new_set.uuid == media_set_label.uuid { + if new_set.seq_nr != media_set_label.seq_nr { + bail!("got media with wrong media sequence number ({} != {}", + new_set.seq_nr,media_set_label.seq_nr); + } + media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?; + } else { + println!("wrinting new media set label (overwrite '{}/{}')", + media_set_label.uuid.to_string(), media_set_label.seq_nr); + + tmp_drive.write_media_set_label(new_set)?; + + let info = MediaId { + label: info.label, + media_set_label: Some(new_set.clone()), + }; + media_catalog = MediaCatalog::overwrite(status_path, &info, true)?; + } + } + } + + // todo: verify last content/media_catalog somehow? + tmp_drive.move_to_eom()?; + + Ok((tmp_drive, media_catalog)) +}