diff --git a/src/tape/helpers/blocked_reader.rs b/src/tape/helpers/blocked_reader.rs new file mode 100644 index 00000000..5ee090dc --- /dev/null +++ b/src/tape/helpers/blocked_reader.rs @@ -0,0 +1,309 @@ +use std::io::Read; + +use crate::tape::{ + TapeRead, + tape_device_read_block, + file_formats::{ + PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0, + BlockHeader, + BlockHeaderFlags, + }, +}; + +/// Read a block stream generated by 'BlockWriter'. +/// +/// This class implements 'TapeRead'. It always read whole blocks from +/// the underlying reader, and does additional error checks: +/// +/// - check magic number (detect streams not written by 'BlockWriter') +/// - check block size +/// - check block sequence numbers +/// +/// The reader consumes the EOF mark after the data stream (if read to +/// the end of the stream). +pub struct BlockedReader { + reader: R, + buffer: Box, + seq_nr: u32, + found_end_marker: bool, + incomplete: bool, + got_eod: bool, + read_error: bool, + read_pos: usize, +} + +impl BlockedReader { + + /// Create a new BlockedReader instance. + /// + /// This tries to read the first block, and returns None if we are + /// at EOT. + pub fn open(mut reader: R) -> Result, std::io::Error> { + + let mut buffer = BlockHeader::new(); + + if !Self::read_block_frame(&mut buffer, &mut reader)? { + return Ok(None); + } + + let (_size, found_end_marker) = Self::check_buffer(&buffer, 0)?; + + let mut incomplete = false; + if found_end_marker { + incomplete = buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); + } + Ok(Some(Self { + reader, + buffer, + found_end_marker, + incomplete, + seq_nr: 1, + got_eod: false, + read_error: false, + read_pos: 0, + })) + } + + fn check_buffer(buffer: &BlockHeader, seq_nr: u32) -> Result<(usize, bool), std::io::Error> { + + if buffer.magic != PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0 { + proxmox::io_bail!("detected tape block with wrong magic number - not written by proxmox tape"); + } + + if seq_nr != buffer.seq_nr() { + proxmox::io_bail!( + "detected tape block with wrong seqence number ({} != {})", + seq_nr, buffer.seq_nr()) + } + + let size = buffer.size(); + let found_end_marker = buffer.flags.contains(BlockHeaderFlags::END_OF_STREAM); + + if size > buffer.payload.len() { + proxmox::io_bail!("detected tape block with wrong payload size ({} > {}", size, buffer.payload.len()); + } else if size == 0 { + if !found_end_marker{ + proxmox::io_bail!("detected tape block with zero payload size"); + } + } + + + Ok((size, found_end_marker)) + } + + fn read_block_frame(buffer: &mut BlockHeader, reader: &mut R) -> Result { + + let data = unsafe { + std::slice::from_raw_parts_mut( + (buffer as *mut BlockHeader) as *mut u8, + BlockHeader::SIZE, + ) + }; + + tape_device_read_block(reader, data) + } + + fn read_block(&mut self) -> Result { + + if !Self::read_block_frame(&mut self.buffer, &mut self.reader)? { + self.got_eod = true; + self.read_pos = self.buffer.payload.len(); + if !self.found_end_marker { + proxmox::io_bail!("detected tape stream without end marker"); + } + return Ok(0); // EOD + } + + let (size, found_end_marker) = Self::check_buffer(&self.buffer, self.seq_nr)?; + self.seq_nr += 1; + + if found_end_marker { // consume EOF mark + self.found_end_marker = true; + self.incomplete = self.buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); + let mut tmp_buf = [0u8; 512]; // use a small buffer for testing EOF + if tape_device_read_block(&mut self.reader, &mut tmp_buf)? { + proxmox::io_bail!("detected tape block after stream end marker"); + } else { + self.got_eod = true; + } + } + + self.read_pos = 0; + + Ok(size) + } +} + +impl TapeRead for BlockedReader { + + fn is_incomplete(&self) -> Result { + if !self.got_eod { + proxmox::io_bail!("is_incomplete failed: EOD not reached"); + } + if !self.found_end_marker { + proxmox::io_bail!("is_incomplete failed: no end marker found"); + } + + Ok(self.incomplete) + } + + fn has_end_marker(&self) -> Result { + if !self.got_eod { + proxmox::io_bail!("has_end_marker failed: EOD not reached"); + } + + Ok(self.found_end_marker) + } +} + +impl Read for BlockedReader { + + fn read(&mut self, buffer: &mut [u8]) -> Result { + + if self.read_error { + proxmox::io_bail!("detected read after error - internal error"); + } + + let mut buffer_size = self.buffer.size(); + let mut rest = (buffer_size as isize) - (self.read_pos as isize); + + if rest <= 0 && !self.got_eod { // try to refill buffer + buffer_size = match self.read_block() { + Ok(len) => len, + err => { + self.read_error = true; + return err; + } + }; + rest = buffer_size as isize; + } + + if rest <= 0 { + return Ok(0); + } else { + let copy_len = if (buffer.len() as isize) < rest { + buffer.len() + } else { + rest as usize + }; + buffer[..copy_len].copy_from_slice( + &self.buffer.payload[self.read_pos..(self.read_pos + copy_len)]); + self.read_pos += copy_len; + return Ok(copy_len); + } + } +} + +#[cfg(test)] +mod test { + use std::io::Read; + use anyhow::Error; + use crate::tape::{ + TapeWrite, + file_formats::PROXMOX_TAPE_BLOCK_SIZE, + helpers::{ + BlockedReader, + BlockedWriter, + }, + }; + + fn write_and_verify(data: &[u8]) -> Result<(), Error> { + + let mut tape_data = Vec::new(); + + let mut writer = BlockedWriter::new(&mut tape_data); + + writer.write_all(data)?; + + writer.finish(false)?; + + assert_eq!( + tape_data.len(), + ((data.len() + PROXMOX_TAPE_BLOCK_SIZE)/PROXMOX_TAPE_BLOCK_SIZE) + *PROXMOX_TAPE_BLOCK_SIZE + ); + + let reader = &mut &tape_data[..]; + let mut reader = BlockedReader::open(reader)?.unwrap(); + + let mut read_data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); + reader.read_to_end(&mut read_data)?; + + assert_eq!(data.len(), read_data.len()); + + assert_eq!(data, &read_data[..]); + + Ok(()) + } + + #[test] + fn empty_stream() -> Result<(), Error> { + write_and_verify(b"") + } + + #[test] + fn small_data() -> Result<(), Error> { + write_and_verify(b"ABC") + } + + #[test] + fn large_data() -> Result<(), Error> { + let data = proxmox::sys::linux::random_data(1024*1024*5)?; + write_and_verify(&data) + } + + #[test] + fn no_data() -> Result<(), Error> { + let tape_data = Vec::new(); + let reader = &mut &tape_data[..]; + let reader = BlockedReader::open(reader)?; + assert!(reader.is_none()); + + Ok(()) + } + + #[test] + fn no_end_marker() -> Result<(), Error> { + let mut tape_data = Vec::new(); + { + let mut writer = BlockedWriter::new(&mut tape_data); + // write at least one block + let data = proxmox::sys::linux::random_data(PROXMOX_TAPE_BLOCK_SIZE)?; + writer.write_all(&data)?; + // but do not call finish here + } + let reader = &mut &tape_data[..]; + let mut reader = BlockedReader::open(reader)?.unwrap(); + + let mut data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); + assert!(reader.read_to_end(&mut data).is_err()); + + Ok(()) + } + + #[test] + fn small_read_buffer() -> Result<(), Error> { + let mut tape_data = Vec::new(); + + let mut writer = BlockedWriter::new(&mut tape_data); + + writer.write_all(b"ABC")?; + + writer.finish(false)?; + + let reader = &mut &tape_data[..]; + let mut reader = BlockedReader::open(reader)?.unwrap(); + + let mut buf = [0u8; 1]; + assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); + assert_eq!(&buf, b"A"); + assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); + assert_eq!(&buf, b"B"); + assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); + assert_eq!(&buf, b"C"); + assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); + assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); + + Ok(()) + } +} diff --git a/src/tape/helpers/blocked_writer.rs b/src/tape/helpers/blocked_writer.rs new file mode 100644 index 00000000..c51beedc --- /dev/null +++ b/src/tape/helpers/blocked_writer.rs @@ -0,0 +1,124 @@ +use std::io::Write; + +use proxmox::tools::vec; + +use crate::tape::{ + TapeWrite, + tape_device_write_block, + file_formats::{ + BlockHeader, + BlockHeaderFlags, + }, +}; + +/// Assemble and write blocks of data +/// +/// This type implement 'TapeWrite'. Data written is assembled to +/// equally sized blocks (see 'BlockHeader'), which are then written +/// to the underlying writer. +pub struct BlockedWriter { + writer: W, + buffer: Box, + buffer_pos: usize, + seq_nr: u32, + logical_end_of_media: bool, + bytes_written: usize, +} + +impl BlockedWriter { + + /// Allow access to underlying writer + pub fn writer_ref_mut(&mut self) -> &mut W { + &mut self.writer + } + + /// Creates a new instance. + pub fn new(writer: W) -> Self { + Self { + writer, + buffer: BlockHeader::new(), + buffer_pos: 0, + seq_nr: 0, + logical_end_of_media: false, + bytes_written: 0, + } + } + + fn write_block(buffer: &BlockHeader, writer: &mut W) -> Result { + + let data = unsafe { + std::slice::from_raw_parts( + (buffer as *const BlockHeader) as *const u8, + BlockHeader::SIZE, + ) + }; + tape_device_write_block(writer, data) + } + + fn write(&mut self, data: &[u8]) -> Result { + + if data.is_empty() { return Ok(0); } + + let rest = self.buffer.payload.len() - self.buffer_pos; + let bytes = if data.len() < rest { data.len() } else { rest }; + self.buffer.payload[self.buffer_pos..(self.buffer_pos+bytes)] + .copy_from_slice(&data[..bytes]); + + let rest = rest - bytes; + + if rest == 0 { + self.buffer.flags = BlockHeaderFlags::empty(); + self.buffer.set_size(self.buffer.payload.len()); + self.buffer.set_seq_nr(self.seq_nr); + self.seq_nr += 1; + let leom = Self::write_block(&self.buffer, &mut self.writer)?; + if leom { self.logical_end_of_media = true; } + self.buffer_pos = 0; + self.bytes_written += BlockHeader::SIZE; + + } else { + self.buffer_pos = self.buffer_pos + bytes; + } + + Ok(bytes) + } + +} + +impl TapeWrite for BlockedWriter { + + fn write_all(&mut self, mut data: &[u8]) -> Result { + while !data.is_empty() { + match self.write(data) { + Ok(n) => data = &data[n..], + Err(e) => return Err(e), + } + } + Ok(self.logical_end_of_media) + } + + fn bytes_written(&self) -> usize { + self.bytes_written + } + + /// flush last block, set END_OF_STREAM flag + /// + /// Note: This may write an empty block just including the + /// END_OF_STREAM flag. + fn finish(&mut self, incomplete: bool) -> Result { + vec::clear(&mut self.buffer.payload[self.buffer_pos..]); + self.buffer.flags = BlockHeaderFlags::END_OF_STREAM; + if incomplete { self.buffer.flags |= BlockHeaderFlags::INCOMPLETE; } + self.buffer.set_size(self.buffer_pos); + self.buffer.set_seq_nr(self.seq_nr); + self.seq_nr += 1; + self.bytes_written += BlockHeader::SIZE; + Self::write_block(&self.buffer, &mut self.writer) + } + + /// Returns if the writer already detected the logical end of media + fn logical_end_of_media(&self) -> bool { + self.logical_end_of_media + } + +} diff --git a/src/tape/helpers/mod.rs b/src/tape/helpers/mod.rs index 66c8fea9..9186c5db 100644 --- a/src/tape/helpers/mod.rs +++ b/src/tape/helpers/mod.rs @@ -3,3 +3,9 @@ pub use emulate_tape_writer::*; mod emulate_tape_reader; pub use emulate_tape_reader::*; + +mod blocked_reader; +pub use blocked_reader::*; + +mod blocked_writer; +pub use blocked_writer::*;