diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs index 814b3346..d830c570 100644 --- a/src/bin/pxar.rs +++ b/src/bin/pxar.rs @@ -295,7 +295,7 @@ fn extract_archive( )] /// Create a new .pxar archive. #[allow(clippy::too_many_arguments)] -fn create_archive( +async fn create_archive( archive: String, source: String, verbose: bool, @@ -376,7 +376,7 @@ fn create_archive( dir, writer, feature_flags, - |path| { + move |path| { if verbose { println!("{:?}", path); } @@ -384,7 +384,7 @@ fn create_archive( }, None, options, - )?; + ).await?; Ok(()) } diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 5fb28fd5..b57061a3 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -4,10 +4,10 @@ use std::path::Path; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use std::thread; use anyhow::{format_err, Error}; use futures::stream::Stream; +use futures::future::{Abortable, AbortHandle}; use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; @@ -21,14 +21,14 @@ use crate::backup::CatalogWriter; /// consumer. pub struct PxarBackupStream { rx: Option, Error>>>, - child: Option>, + handle: Option, error: Arc>>, } impl Drop for PxarBackupStream { fn drop(&mut self) { self.rx = None; - self.child.take().unwrap().join().unwrap(); + self.handle.take().unwrap().abort(); } } @@ -43,42 +43,41 @@ impl PxarBackupStream { let buffer_size = 256 * 1024; let error = Arc::new(Mutex::new(None)); - let child = std::thread::Builder::new() - .name("PxarBackupStream".to_string()) - .spawn({ - let error = Arc::clone(&error); - move || { - let mut catalog_guard = catalog.lock().unwrap(); - let writer = std::io::BufWriter::with_capacity( - buffer_size, - crate::tools::StdChannelWriter::new(tx), - ); + let error2 = Arc::clone(&error); + let handler = async move { + let writer = std::io::BufWriter::with_capacity( + buffer_size, + crate::tools::StdChannelWriter::new(tx), + ); - let verbose = options.verbose; + let verbose = options.verbose; - let writer = pxar::encoder::sync::StandardWriter::new(writer); - if let Err(err) = crate::pxar::create_archive( - dir, - writer, - crate::pxar::Flags::DEFAULT, - |path| { - if verbose { - println!("{:?}", path); - } - Ok(()) - }, - Some(&mut *catalog_guard), - options, - ) { - let mut error = error.lock().unwrap(); - *error = Some(err.to_string()); + let writer = pxar::encoder::sync::StandardWriter::new(writer); + if let Err(err) = crate::pxar::create_archive( + dir, + writer, + crate::pxar::Flags::DEFAULT, + move |path| { + if verbose { + println!("{:?}", path); } - } - })?; + Ok(()) + }, + Some(catalog), + options, + ).await { + let mut error = error2.lock().unwrap(); + *error = Some(err.to_string()); + } + }; + + let (handle, registration) = AbortHandle::new_pair(); + let future = Abortable::new(handler, registration); + tokio::spawn(future); Ok(Self { rx: Some(rx), - child: Some(child), + handle: Some(handle), error, }) } diff --git a/src/pxar/create.rs b/src/pxar/create.rs index 36de87da..6950b396 100644 --- a/src/pxar/create.rs +++ b/src/pxar/create.rs @@ -5,16 +5,19 @@ use std::io::{self, Read, Write}; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use nix::dir::Dir; use nix::errno::Errno; use nix::fcntl::OFlag; use nix::sys::stat::{FileStat, Mode}; +use futures::future::BoxFuture; +use futures::FutureExt; use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; use pxar::Metadata; -use pxar::encoder::LinkOffset; +use pxar::encoder::{SeqWrite, LinkOffset}; use proxmox::c_str; use proxmox::sys::error::SysError; @@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter { } } -struct Archiver<'a, 'b> { +struct Archiver { feature_flags: Flags, fs_feature_flags: Flags, fs_magic: i64, patterns: Vec, - callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>, - catalog: Option<&'b mut dyn BackupCatalogWriter>, + callback: Box Result<(), Error> + Send>, + catalog: Option>>, path: PathBuf, entry_counter: usize, entry_limit: usize, @@ -147,19 +150,19 @@ struct Archiver<'a, 'b> { file_copy_buffer: Vec, } -type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>; +type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; -pub fn create_archive( +pub async fn create_archive( source_dir: Dir, mut writer: T, feature_flags: Flags, - mut callback: F, - catalog: Option<&mut dyn BackupCatalogWriter>, + callback: F, + catalog: Option>>, options: PxarCreateOptions, ) -> Result<(), Error> where - T: pxar::encoder::SeqWrite, - F: FnMut(&Path) -> Result<(), Error>, + T: SeqWrite + Send, + F: FnMut(&Path) -> Result<(), Error> + Send + 'static, { let fs_magic = detect_fs_type(source_dir.as_raw_fd())?; if is_virtual_file_system(fs_magic) { @@ -182,8 +185,7 @@ where set.insert(stat.st_dev); } - let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite; - let mut encoder = Encoder::new(writer, &metadata)?; + let mut encoder = Encoder::new(&mut writer, &metadata).await?; let mut patterns = options.patterns; @@ -199,7 +201,7 @@ where feature_flags, fs_feature_flags, fs_magic, - callback: &mut callback, + callback: Box::new(callback), patterns, catalog, path: PathBuf::new(), @@ -213,8 +215,8 @@ where file_copy_buffer: vec::undefined(4 * 1024 * 1024), }; - archiver.archive_dir_contents(&mut encoder, source_dir, true)?; - encoder.finish()?; + archiver.archive_dir_contents(&mut encoder, source_dir, true).await?; + encoder.finish().await?; Ok(()) } @@ -224,7 +226,7 @@ struct FileListEntry { stat: FileStat, } -impl<'a, 'b> Archiver<'a, 'b> { +impl Archiver { /// Get the currently effective feature flags. (Requested flags masked by the file system /// feature flags). fn flags(&self) -> Flags { @@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> { } } - fn archive_dir_contents( - &mut self, - encoder: &mut Encoder, + fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>( + &'a mut self, + encoder: &'a mut Encoder<'b, T>, mut dir: Dir, is_root: bool, - ) -> Result<(), Error> { - let entry_counter = self.entry_counter; + ) -> BoxFuture<'a, Result<(), Error>> { + async move { + let entry_counter = self.entry_counter; - let old_patterns_count = self.patterns.len(); - self.read_pxar_excludes(dir.as_raw_fd())?; + let old_patterns_count = self.patterns.len(); + self.read_pxar_excludes(dir.as_raw_fd())?; - let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?; + let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?; - if is_root && old_patterns_count > 0 { - file_list.push(FileListEntry { - name: CString::new(".pxarexclude-cli").unwrap(), - path: PathBuf::new(), - stat: unsafe { std::mem::zeroed() }, - }); - } - - let dir_fd = dir.as_raw_fd(); - - let old_path = std::mem::take(&mut self.path); - - for file_entry in file_list { - let file_name = file_entry.name.to_bytes(); - - if is_root && file_name == b".pxarexclude-cli" { - self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?; - continue; + if is_root && old_patterns_count > 0 { + file_list.push(FileListEntry { + name: CString::new(".pxarexclude-cli").unwrap(), + path: PathBuf::new(), + stat: unsafe { std::mem::zeroed() }, + }); } - (self.callback)(&file_entry.path)?; - self.path = file_entry.path; - self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat) - .map_err(|err| self.wrap_err(err))?; - } - self.path = old_path; - self.entry_counter = entry_counter; - self.patterns.truncate(old_patterns_count); + let dir_fd = dir.as_raw_fd(); - Ok(()) + let old_path = std::mem::take(&mut self.path); + + for file_entry in file_list { + let file_name = file_entry.name.to_bytes(); + + if is_root && file_name == b".pxarexclude-cli" { + self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?; + continue; + } + + (self.callback)(&file_entry.path)?; + self.path = file_entry.path; + self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await + .map_err(|err| self.wrap_err(err))?; + } + self.path = old_path; + self.entry_counter = entry_counter; + self.patterns.truncate(old_patterns_count); + + Ok(()) + }.boxed() } /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`. @@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(()) } - fn encode_pxarexclude_cli( + async fn encode_pxarexclude_cli( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, file_name: &CStr, patterns_count: usize, ) -> Result<(), Error> { let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]); - - if let Some(ref mut catalog) = self.catalog { - catalog.add_file(file_name, content.len() as u64, 0)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?; } let mut metadata = Metadata::default(); metadata.stat.mode = pxar::format::mode::IFREG | 0o600; - let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?; - file.write_all(&content)?; + let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?; + file.write_all(&content).await?; Ok(()) } @@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(()) } - fn add_entry( + async fn add_entry( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, parent: RawFd, c_file_name: &CStr, stat: &FileStat, @@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> { if stat.st_nlink > 1 { if let Some((path, offset)) = self.hardlinks.get(&link_info) { - if let Some(ref mut catalog) = self.catalog { - catalog.add_hardlink(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_hardlink(c_file_name)?; } - encoder.add_hardlink(file_name, path, *offset)?; + encoder.add_hardlink(file_name, path, *offset).await?; return Ok(()); } } let file_size = stat.st_size as u64; - if let Some(ref mut catalog) = self.catalog { - catalog.add_file(c_file_name, file_size, stat.st_mtime)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?; } let offset: LinkOffset = - self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?; + self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?; if stat.st_nlink > 1 { self.hardlinks.insert(link_info, (self.path.clone(), offset)); @@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> { mode::IFDIR => { let dir = Dir::from_fd(fd.into_raw_fd())?; - if let Some(ref mut catalog) = self.catalog { - catalog.start_directory(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(c_file_name)?; } - let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat); - if let Some(ref mut catalog) = self.catalog { - catalog.end_directory()?; + let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; } result } mode::IFSOCK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_socket(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_socket(c_file_name)?; } - Ok(encoder.add_socket(&metadata, file_name)?) + Ok(encoder.add_socket(&metadata, file_name).await?) } mode::IFIFO => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_fifo(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_fifo(c_file_name)?; } - Ok(encoder.add_fifo(&metadata, file_name)?) + Ok(encoder.add_fifo(&metadata, file_name).await?) } mode::IFLNK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_symlink(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_symlink(c_file_name)?; } - self.add_symlink(encoder, fd, file_name, &metadata) + self.add_symlink(encoder, fd, file_name, &metadata).await } mode::IFBLK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_block_device(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_block_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, &stat) + self.add_device(encoder, file_name, &metadata, &stat).await } mode::IFCHR => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_char_device(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_char_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, &stat) + self.add_device(encoder, file_name, &metadata, &stat).await } other => bail!( "encountered unknown file type: 0x{:x} (0o{:o})", @@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> { } } - fn add_directory( + async fn add_directory( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, dir: Dir, dir_name: &CStr, metadata: &Metadata, @@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> { ) -> Result<(), Error> { let dir_name = OsStr::from_bytes(dir_name.to_bytes()); - let mut encoder = encoder.create_directory(dir_name, &metadata)?; + let mut encoder = encoder.create_directory(dir_name, &metadata).await?; let old_fs_magic = self.fs_magic; let old_fs_feature_flags = self.fs_feature_flags; @@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> { writeln!(self.logger, "skipping mount point: {:?}", self.path)?; Ok(()) } else { - self.archive_dir_contents(&mut encoder, dir, false) + self.archive_dir_contents(&mut encoder, dir, false).await }; self.fs_magic = old_fs_magic; self.fs_feature_flags = old_fs_feature_flags; self.current_st_dev = old_st_dev; - encoder.finish()?; + encoder.finish().await?; result } - fn add_regular_file( + async fn add_regular_file( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, fd: Fd, file_name: &Path, metadata: &Metadata, @@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> { ) -> Result { let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; let mut remaining = file_size; - let mut out = encoder.create_file(metadata, file_name, file_size)?; + let mut out = encoder.create_file(metadata, file_name, file_size).await?; while remaining != 0 { let mut got = match file.read(&mut self.file_copy_buffer[..]) { Ok(0) => break, @@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> { self.report_file_grew_while_reading()?; got = remaining as usize; } - out.write_all(&self.file_copy_buffer[..got])?; + out.write_all(&self.file_copy_buffer[..got]).await?; remaining -= got as u64; } if remaining > 0 { @@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> { vec::clear(&mut self.file_copy_buffer[..to_zero]); while remaining != 0 { let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize; - out.write_all(&self.file_copy_buffer[..fill])?; + out.write_all(&self.file_copy_buffer[..fill]).await?; remaining -= fill as u64; } } @@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(out.file_offset()) } - fn add_symlink( + async fn add_symlink( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, fd: Fd, file_name: &Path, metadata: &Metadata, ) -> Result<(), Error> { let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?; - encoder.add_symlink(metadata, file_name, dest)?; + encoder.add_symlink(metadata, file_name, dest).await?; Ok(()) } - fn add_device( + async fn add_device( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, file_name: &Path, metadata: &Metadata, stat: &FileStat, @@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> { metadata, file_name, pxar::format::Device::from_dev_t(stat.st_rdev), - )?) + ).await?) } } diff --git a/tests/catar.rs b/tests/catar.rs index 2d9dea71..550600c6 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> { ..PxarCreateOptions::default() }; - create_archive( + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(create_archive( dir, writer, Flags::DEFAULT, |_| Ok(()), None, options, - )?; + ))?; Command::new("cmp") .arg("--verbose")