From d035ecb59a8e727ebaed7b90a195273bd32f6065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= Date: Thu, 7 Apr 2022 11:00:42 +0200 Subject: [PATCH] refactor interfaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - remove Pool from public interfaces - remove pool_dir from MirrorConfig, rename base_dir to dir - add mirror::{init,destroy,gc,list_snapshots, remove_snapshot} - rename mirror::mirror to mirror::create_snapshot - move ParsedMirrorConfig into mirror module - add new helpers in medium.rs - use Snapshot in more places - make Snapshot (de)serializable and sort/comparable - move Snapshot into types.rs - reduce visibility where possible Signed-off-by: Fabian Grünbichler --- Cargo.toml | 1 + src/bin/proxmox-apt-mirror.rs | 9 +- src/bin/proxmox-apt-repo.rs | 15 +- src/bin/proxmox_apt_mirror_cmds/config.rs | 17 +- src/bin/proxmox_apt_mirror_cmds/medium.rs | 51 +++- src/bin/proxmox_apt_mirror_cmds/mirror.rs | 78 ++----- src/config.rs | 69 ++---- src/lib.rs | 39 +++- src/medium.rs | 272 ++++++++++------------ src/mirror.rs | 106 ++++++++- src/pool.rs | 45 ++-- src/snapshot.rs | 19 -- src/types.rs | 43 +++- 13 files changed, 431 insertions(+), 333 deletions(-) delete mode 100644 src/snapshot.rs diff --git a/Cargo.toml b/Cargo.toml index 8fd4382..9a568b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,5 +30,6 @@ proxmox-async = "0.4" proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-schema = { version = "1.1", features = [ "api-macro" ] } proxmox-section-config = "1" +proxmox-serde = "0.1" proxmox-sys = { version = "0.3" } proxmox-time = { version = "1.1.3" } diff --git a/src/bin/proxmox-apt-mirror.rs b/src/bin/proxmox-apt-mirror.rs index dc8c414..61bd5ab 100644 --- a/src/bin/proxmox-apt-mirror.rs +++ b/src/bin/proxmox-apt-mirror.rs @@ -13,6 +13,7 @@ use proxmox_apt_mirror::helpers::tty::{ }; use proxmox_apt_mirror::{ config::{save_config, MediaConfig, MirrorConfig}, + mirror, types::MIRROR_ID_SCHEMA, }; @@ -256,7 +257,7 @@ fn action_add_mirror(config: &SectionConfigData) -> Result break id; }; - let base_dir = loop { + let dir = loop { let path = read_string_from_tty("Enter path where mirrored repository will be stored", None)?; if Path::new(&path).exists() { @@ -266,8 +267,6 @@ fn action_add_mirror(config: &SectionConfigData) -> Result } }; - let pool_dir = format!("{base_dir}/.pool"); - let verify = read_bool_from_tty( "Should already mirrored files be re-verified when updating the mirror? (io-intensive!)", Some(true), @@ -281,8 +280,7 @@ fn action_add_mirror(config: &SectionConfigData) -> Result key_path, verify, sync, - base_dir, - pool_dir, + dir, }) } @@ -469,6 +467,7 @@ async fn setup(_param: Value) -> Result<(), Error> { Action::AddMirror => { let mirror_config = action_add_mirror(&config)?; let id = mirror_config.id.clone(); + mirror::init(&mirror_config)?; config.set_data(&id, "mirror", mirror_config)?; save_config(&config_file, &config)?; println!("Config entry '{id}' added"); diff --git a/src/bin/proxmox-apt-repo.rs b/src/bin/proxmox-apt-repo.rs index c051a44..68c7547 100644 --- a/src/bin/proxmox-apt-repo.rs +++ b/src/bin/proxmox-apt-repo.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, path::Path}; use anyhow::{bail, Error}; +use proxmox_apt_mirror::types::Snapshot; use proxmox_sys::{fs::file_get_contents, linux::tty}; use proxmox_time::epoch_to_rfc3339_utc; use serde_json::Value; @@ -109,14 +110,20 @@ async fn setup(_param: Value) -> Result<(), Error> { } let selected_mirror = read_selection_from_tty("Select mirror", &mirrors, None)?; - let snapshots = medium::list_snapshots(mountpoint, selected_mirror)?; + let snapshots: Vec<(Snapshot, String)> = + medium::list_snapshots(mountpoint, selected_mirror)? + .into_iter() + .map(|s| (s, s.to_string())) + .collect(); if snapshots.is_empty() { println!("Mirror doesn't have any synced snapshots."); continue; } - let snapshots: Vec<(&str, &str)> = - snapshots.iter().map(|s| (s.as_ref(), s.as_ref())).collect(); + let snapshots: Vec<(&Snapshot, &str)> = snapshots + .iter() + .map(|(snap, string)| (snap, string.as_ref())) + .collect(); let selected_snapshot = read_selection_from_tty( "Select snapshot", &snapshots, @@ -127,7 +134,7 @@ async fn setup(_param: Value) -> Result<(), Error> { selected_mirror.to_string(), ( state.mirrors.get(*selected_mirror).unwrap(), - selected_snapshot.to_string(), + **selected_snapshot, ), ); } diff --git a/src/bin/proxmox_apt_mirror_cmds/config.rs b/src/bin/proxmox_apt_mirror_cmds/config.rs index 0aeb806..085997b 100644 --- a/src/bin/proxmox_apt_mirror_cmds/config.rs +++ b/src/bin/proxmox_apt_mirror_cmds/config.rs @@ -1,5 +1,3 @@ -use std::path::Path; - use anyhow::Error; use serde_json::Value; @@ -11,7 +9,7 @@ use proxmox_schema::{api, param_bail, ApiType, ArraySchema, ReturnType}; use proxmox_apt_mirror::{ config::{MediaConfig, MediaConfigUpdater, MirrorConfig, MirrorConfigUpdater}, - pool::Pool, + mirror, types::MIRROR_ID_SCHEMA, }; @@ -148,7 +146,8 @@ async fn add_mirror( param_bail!("name", "mirror config entry '{}' already exists.", data.id); } - let _pool = Pool::create(Path::new(&data.base_dir), Path::new(&data.pool_dir))?; + mirror::init(&data)?; + section_config.set_data(&data.id, "mirror", &data)?; proxmox_apt_mirror::config::save_config(&config, §ion_config)?; @@ -193,8 +192,7 @@ async fn remove_mirror( match section_config.lookup::("mirror", &id) { Ok(config) => { if remove_data { - let pool: Pool = (&config).try_into()?; - pool.lock()?.destroy()?; + mirror::destroy(&config)?; } section_config.sections.remove(&id); @@ -247,11 +245,8 @@ pub fn update_mirror( if let Some(repository) = update.repository { data.repository = repository } - if let Some(base_dir) = update.base_dir { - data.base_dir = base_dir - } - if let Some(pool_dir) = update.pool_dir { - data.pool_dir = pool_dir + if let Some(dir) = update.dir { + data.dir = dir } if let Some(architectures) = update.architectures { data.architectures = architectures diff --git a/src/bin/proxmox_apt_mirror_cmds/medium.rs b/src/bin/proxmox_apt_mirror_cmds/medium.rs index 5162455..26ade9c 100644 --- a/src/bin/proxmox_apt_mirror_cmds/medium.rs +++ b/src/bin/proxmox_apt_mirror_cmds/medium.rs @@ -1,4 +1,7 @@ +use std::path::Path; + use anyhow::Error; +use proxmox_time::epoch_to_rfc3339_utc; use serde_json::Value; use proxmox_router::cli::{CliCommand, CliCommandMap, CommandLineInterface, OUTPUT_FORMAT}; @@ -6,6 +9,7 @@ use proxmox_schema::api; use proxmox_apt_mirror::{ config::{MediaConfig, MirrorConfig}, + generate_repo_file_line, medium::{self}, types::MIRROR_ID_SCHEMA, }; @@ -69,9 +73,52 @@ async fn status(config: Option, id: String, _param: Value) -> Result { + println!("no snapshots"); + None + } + Some(last) => { + if let Some(first) = snapshots.first() { + if first == last { + println!("1 snapshot: '{last}'"); + } else { + println!("{} snapshots: '{first}..{last}'", snapshots.len()); + } + Some(generate_repo_file_line(path, id, mirror, last)?) + } else { + None + } + } + }; + println!("Original repository config: '{}'", mirror.repository); + if let Some(repo_line) = repo_line { + println!("Medium repository line: '{repo_line}'"); + } + } Ok(Value::Null) } diff --git a/src/bin/proxmox_apt_mirror_cmds/mirror.rs b/src/bin/proxmox_apt_mirror_cmds/mirror.rs index 1ebcff3..fcd7e12 100644 --- a/src/bin/proxmox_apt_mirror_cmds/mirror.rs +++ b/src/bin/proxmox_apt_mirror_cmds/mirror.rs @@ -1,20 +1,17 @@ -use std::path::Path; - use anyhow::Error; -use nix::libc; use serde_json::Value; use proxmox_router::cli::{ - default_table_format_options, format_and_print_result_full, get_output_format, CliCommand, - CliCommandMap, CommandLineInterface, OUTPUT_FORMAT, + format_and_print_result, get_output_format, CliCommand, CliCommandMap, CommandLineInterface, + OUTPUT_FORMAT, }; -use proxmox_schema::{api, ApiStringFormat, ArraySchema, ReturnType, Schema, StringSchema}; +use proxmox_schema::api; use proxmox_apt_mirror::{ config::MirrorConfig, - pool::Pool, - types::{MIRROR_ID_SCHEMA, SNAPSHOT_REGEX}, + mirror, + types::{Snapshot, MIRROR_ID_SCHEMA}, }; use super::DEFAULT_CONFIG_PATH; @@ -35,26 +32,16 @@ use super::DEFAULT_CONFIG_PATH; )] /// Create a new repository snapshot, fetching required/missing files from original repository. async fn create_snapshot(config: Option, id: String, _param: Value) -> Result<(), Error> { - //let output_format = get_output_format(¶m); let config = config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_string()); let (config, _digest) = proxmox_apt_mirror::config::config(&config)?; let config = config.lookup("mirror", &id)?; - proxmox_apt_mirror::mirror::mirror(config)?; + proxmox_apt_mirror::mirror::create_snapshot(config, &Snapshot::now())?; Ok(()) } -const SNAPSHOT_SCHEMA: Schema = StringSchema::new("Mirror snapshot") - .format(&ApiStringFormat::Pattern(&SNAPSHOT_REGEX)) - .schema(); - -const LIST_SNAPSHOTS_RETURN_TYPE: ReturnType = ReturnType { - schema: &ArraySchema::new("Returns the list of snapshots.", &SNAPSHOT_SCHEMA).schema(), - optional: true, -}; - #[api( input: { properties: { @@ -66,6 +53,10 @@ const LIST_SNAPSHOTS_RETURN_TYPE: ReturnType = ReturnType { id: { schema: MIRROR_ID_SCHEMA, }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, }, }, )] @@ -77,33 +68,17 @@ async fn list_snapshots(config: Option, id: String, param: Value) -> Res let (config, _digest) = proxmox_apt_mirror::config::config(&config)?; let config: MirrorConfig = config.lookup("mirror", &id)?; - let _pool: Pool = (&config).try_into()?; - let mut list = vec![]; + let list = mirror::list_snapshots(&config)?; - let path = Path::new(&config.base_dir); - - proxmox_sys::fs::scandir( - libc::AT_FDCWD, - path, - &SNAPSHOT_REGEX, - |_l2_fd, snapshot, file_type| { - if file_type != nix::dir::Type::Directory { - return Ok(()); - } - - list.push(snapshot.to_string()); - - Ok(()) - }, - )?; - let mut list = serde_json::json!(list); - - format_and_print_result_full( - &mut list, - &LIST_SNAPSHOTS_RETURN_TYPE, - &output_format, - &default_table_format_options(), - ); + if output_format == "text" { + println!("Found {} snapshots:", list.len()); + for snap in &list { + println!("- {snap}"); + } + } else { + let list = serde_json::json!(list); + format_and_print_result(&list, &output_format); + } Ok(()) } @@ -120,7 +95,7 @@ async fn list_snapshots(config: Option, id: String, param: Value) -> Res schema: MIRROR_ID_SCHEMA, }, snapshot: { - schema: SNAPSHOT_SCHEMA, + type: Snapshot, }, "output-format": { schema: OUTPUT_FORMAT, @@ -133,17 +108,14 @@ async fn list_snapshots(config: Option, id: String, param: Value) -> Res async fn remove_snapshot( config: Option, id: String, - snapshot: String, + snapshot: Snapshot, _param: Value, ) -> Result<(), Error> { let config = config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_string()); let (config, _digest) = proxmox_apt_mirror::config::config(&config)?; let config: MirrorConfig = config.lookup("mirror", &id)?; - let pool: Pool = (&config).try_into()?; - let path = pool.get_path(Path::new(&snapshot))?; - - pool.lock()?.remove_dir(&path)?; + mirror::remove_snapshot(&config, &snapshot)?; Ok(()) } @@ -172,9 +144,9 @@ async fn garbage_collect(config: Option, id: String, _param: Value) -> R let (config, _digest) = proxmox_apt_mirror::config::config(&config)?; let config: MirrorConfig = config.lookup("mirror", &id)?; - let pool: Pool = (&config).try_into()?; - let (count, size) = pool.lock()?.gc()?; + let (count, size) = mirror::gc(&config)?; + println!("Removed {} files totalling {}b", count, size); Ok(()) diff --git a/src/config.rs b/src/config.rs index e668646..64e65b9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,12 +4,11 @@ use anyhow::{bail, Error}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; -use proxmox_apt::repositories::APTRepository; use proxmox_schema::{api, ApiType, Schema, Updater}; use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin}; -use proxmox_sys::fs::{file_get_contents, replace_file, CreateOptions}; +use proxmox_sys::fs::{replace_file, CreateOptions}; -use crate::{convert_repo_line, pool::Pool, types::MIRROR_ID_SCHEMA}; +use crate::types::MIRROR_ID_SCHEMA; #[api( properties: { @@ -18,61 +17,49 @@ use crate::{convert_repo_line, pool::Pool, types::MIRROR_ID_SCHEMA}; }, repository: { type: String, - description: "Single repository definition in sources.list format.", }, architectures: { type: Array, - description: "List of architectures to mirror", items: { type: String, description: "Architecture specifier.", }, }, - "pool-dir": { + "dir": { type: String, - description: "Path to pool directory storing checksum files.", - }, - "base-dir": { - type: String, - description: "Path to directory storing repository snapshot files (must be on same FS as `pool-dir`).", }, "key-path": { type: String, - description: "Path to signing key of `repository`", }, verify: { type: bool, - description: "Whether to verify existing files stored in pool (IO-intensive).", }, sync: { type: bool, - description: "Whether to write pool updates with fsync flag.", }, } )] #[derive(Clone, Debug, Serialize, Deserialize, Updater)] #[serde(rename_all = "kebab-case")] -/// Configuration file for mirrored repositories. +/// Configuration entry for a mirrored repository. pub struct MirrorConfig { #[updater(skip)] + /// Identifier for this entry. pub id: String, + /// Single repository definition in sources.list format. pub repository: String, + /// List of architectures that should be mirrored. pub architectures: Vec, - pub pool_dir: String, - pub base_dir: String, + /// Path to directory containg mirrored repository. + pub dir: String, + /// Path to public key file for verifying repository integrity. pub key_path: String, + /// Whether to verify existing files or assume they are valid (IO-intensive). pub verify: bool, + /// Whether to write new files using FSYNC. pub sync: bool, } -impl TryInto for &MirrorConfig { - type Error = Error; - - fn try_into(self) -> Result { - Pool::open(Path::new(&self.base_dir), Path::new(&self.pool_dir)) - } -} - #[api( properties: { id: { @@ -112,7 +99,7 @@ pub struct MediaConfig { } lazy_static! { - pub static ref CONFIG: SectionConfig = init(); + static ref CONFIG: SectionConfig = init(); } fn init() -> SectionConfig { @@ -174,33 +161,3 @@ pub fn save_config(path: &str, data: &SectionConfigData) -> Result<(), Error> { let raw = CONFIG.write(path, data)?; replace_file(path, raw.as_bytes(), CreateOptions::default(), true) } - -pub struct ParsedMirrorConfig { - pub repository: APTRepository, - pub architectures: Vec, - pub pool: Pool, - pub key: Vec, - pub verify: bool, - pub sync: bool, -} - -impl TryInto for MirrorConfig { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let pool = (&self).try_into()?; - - let repository = convert_repo_line(self.repository.clone())?; - - let key = file_get_contents(Path::new(&self.key_path))?; - - Ok(ParsedMirrorConfig { - repository, - architectures: self.architectures, - pool, - key, - verify: self.verify, - sync: self.sync, - }) - } -} diff --git a/src/lib.rs b/src/lib.rs index cab4b03..294ce00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,19 @@ use std::{ fmt::Display, ops::{Add, AddAssign}, + path::Path, }; -use anyhow::Error; +use anyhow::{format_err, Error}; +use medium::MirrorInfo; use proxmox_apt::repositories::{APTRepository, APTRepositoryFile, APTRepositoryFileType}; +use types::Snapshot; pub mod config; pub mod helpers; pub mod medium; pub mod mirror; pub mod pool; -pub mod snapshot; pub mod types; struct FetchResult { @@ -90,8 +92,39 @@ impl Display for Progress { } } -pub fn convert_repo_line(line: String) -> Result { +pub(crate) fn convert_repo_line(line: String) -> Result { let mut repository = APTRepositoryFile::with_content(line, APTRepositoryFileType::List); repository.parse()?; Ok(repository.repositories[0].clone()) } + +pub fn generate_repo_file_line( + medium_base: &Path, + mirror_id: &str, + mirror: &MirrorInfo, + snapshot: &Snapshot, +) -> Result { + let mut snapshot_path = medium_base.to_path_buf(); + snapshot_path.push(mirror_id); + snapshot_path.push(snapshot.to_string()); + let snapshot_path = snapshot_path + .to_str() + .ok_or_else(|| format_err!("Failed to convert snapshot path to String"))?; + + let mut repo = convert_repo_line(mirror.repository.clone())?; + repo.uris = vec![format!("file://{}", snapshot_path)]; + + repo.options + .push(proxmox_apt::repositories::APTRepositoryOption { + key: "check-valid-until".to_string(), + values: vec!["false".to_string()], + }); + + let mut res = Vec::new(); + repo.write(&mut res)?; + + let res = String::from_utf8(res) + .map_err(|err| format_err!("Couldn't convert repo line to String - {err}"))?; + + Ok(res.trim_end().to_string()) +} diff --git a/src/medium.rs b/src/medium.rs index 96f4b6c..5fec1ea 100644 --- a/src/medium.rs +++ b/src/medium.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - path::Path, + path::{Path, PathBuf}, }; use anyhow::{bail, format_err, Error}; @@ -10,16 +10,17 @@ use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc}; use serde::{Deserialize, Serialize}; use crate::{ - config::{self, MirrorConfig}, - convert_repo_line, + config::{self, ConfigLockGuard, MediaConfig, MirrorConfig}, + generate_repo_file_line, + mirror::pool, pool::Pool, - types::SNAPSHOT_REGEX, + types::{Snapshot, SNAPSHOT_REGEX}, }; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MirrorInfo { - repository: String, - architectures: Vec, + pub repository: String, + pub architectures: Vec, } impl From<&MirrorConfig> for MirrorInfo { @@ -47,7 +48,74 @@ pub struct MediumState { pub last_sync: i64, } -pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result, Error> { +pub struct MediumMirrorState { + pub synced: HashSet, + pub source_only: HashSet, + pub target_only: HashSet, +} + +fn get_mirror_state(config: &MediaConfig, state: &MediumState) -> MediumMirrorState { + let synced_mirrors: HashSet = state + .mirrors + .iter() + .map(|(id, _mirror)| id.clone()) + .collect(); + let config_mirrors: HashSet = config.mirrors.iter().cloned().collect(); + let new_mirrors: HashSet = config_mirrors + .difference(&synced_mirrors) + .cloned() + .collect(); + let dropped_mirrors: HashSet = synced_mirrors + .difference(&config_mirrors) + .cloned() + .collect(); + + MediumMirrorState { + synced: synced_mirrors, + source_only: new_mirrors, + target_only: dropped_mirrors, + } +} + +fn lock(base: &Path) -> Result { + let mut lockfile = base.to_path_buf(); + lockfile.push("mirror-state"); + let lockfile = lockfile + .to_str() + .ok_or_else(|| format_err!("Couldn't convert lockfile path {lockfile:?})"))?; + config::lock_config(lockfile) +} + +fn statefile(base: &Path) -> PathBuf { + let mut statefile = base.to_path_buf(); + statefile.push(".mirror-state"); + statefile +} + +fn load_state(base: &Path) -> Result, Error> { + let statefile = statefile(base); + + if statefile.exists() { + let raw = file_get_contents(&statefile)?; + let state: MediumState = serde_json::from_slice(&raw)?; + Ok(Some(state)) + } else { + Ok(None) + } +} + +fn write_state(_lock: &ConfigLockGuard, base: &Path, state: &MediumState) -> Result<(), Error> { + replace_file( + &statefile(base), + &serde_json::to_vec(&state)?, + CreateOptions::default(), + true, + )?; + + Ok(()) +} + +pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result, Error> { if !medium_base.exists() { bail!("Medium mountpoint doesn't exist."); } @@ -65,7 +133,7 @@ pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result, E return Ok(()); } - list.push(snapshot.to_string()); + list.push(snapshot.parse()?); Ok(()) }, @@ -78,7 +146,7 @@ pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result, E pub fn generate_repo_snippet( medium_base: &Path, - repositories: &HashMap, + repositories: &HashMap, ) -> Result, Error> { let mut res = Vec::new(); for (mirror_id, (mirror_info, snapshot)) in repositories { @@ -92,51 +160,18 @@ pub fn generate_repo_snippet( Ok(res) } -fn generate_repo_file_line( - medium_base: &Path, - mirror_id: &str, - mirror: &MirrorInfo, - snapshot: &str, -) -> Result { - let mut snapshot_path = medium_base.to_path_buf(); - snapshot_path.push(mirror_id); - snapshot_path.push(snapshot); - let snapshot_path = snapshot_path - .to_str() - .ok_or_else(|| format_err!("Failed to convert snapshot path to String"))?; - - let mut repo = convert_repo_line(mirror.repository.clone())?; - repo.uris = vec![format!("file://{}", snapshot_path)]; - - repo.options - .push(proxmox_apt::repositories::APTRepositoryOption { - key: "check-valid-until".to_string(), - values: vec!["false".to_string()], - }); - - let mut res = Vec::new(); - repo.write(&mut res)?; - - let res = String::from_utf8(res) - .map_err(|err| format_err!("Couldn't convert repo line to String - {err}"))?; - - Ok(res.trim_end().to_string()) -} - pub fn gc(medium: &crate::config::MediaConfig) -> Result<(), Error> { let medium_base = Path::new(&medium.mountpoint); if !medium_base.exists() { bail!("Medium mountpoint doesn't exist."); } - let mut statefile = medium_base.to_path_buf(); - statefile.push(".mirror-state"); + let _lock = lock(medium_base)?; - let _lock = config::lock_config(&format!("{}/{}", medium.mountpoint, "mirror-state"))?; + println!("Loading state.."); + let state = load_state(medium_base)? + .ok_or_else(|| format_err!("Cannot GC empty medium - no statefile found."))?; - println!("Loading state from {statefile:?}.."); - let raw = file_get_contents(&statefile)?; - let state: MediumState = serde_json::from_slice(&raw)?; println!( "Last sync timestamp: {}", epoch_to_rfc3339_utc(state.last_sync)? @@ -170,77 +205,19 @@ pub fn gc(medium: &crate::config::MediaConfig) -> Result<(), Error> { Ok(()) } -pub fn status(medium: &crate::config::MediaConfig) -> Result<(), Error> { +pub fn status( + medium: &crate::config::MediaConfig, +) -> Result<(MediumState, MediumMirrorState), Error> { let medium_base = Path::new(&medium.mountpoint); if !medium_base.exists() { bail!("Medium mountpoint doesn't exist."); } - let mut statefile = medium_base.to_path_buf(); - statefile.push(".mirror-state"); + let state = load_state(medium_base)? + .ok_or_else(|| format_err!("No status available - statefile doesn't exist."))?; + let mirror_state = get_mirror_state(medium, &state); - println!("Loading state from {statefile:?}.."); - let raw = file_get_contents(&statefile)?; - let state: MediumState = serde_json::from_slice(&raw)?; - println!( - "Last sync timestamp: {}", - epoch_to_rfc3339_utc(state.last_sync)? - ); - - let synced_mirrors: HashSet = state - .mirrors - .iter() - .map(|(id, _mirror)| id.clone()) - .collect(); - let config_mirrors: HashSet = medium.mirrors.iter().cloned().collect(); - let new_mirrors: HashSet = config_mirrors - .difference(&synced_mirrors) - .cloned() - .collect(); - let dropped_mirrors: HashSet = synced_mirrors - .difference(&config_mirrors) - .cloned() - .collect(); - - println!("Already synced mirrors: {synced_mirrors:?}"); - println!("Configured mirrors: {config_mirrors:?}"); - - if !new_mirrors.is_empty() { - println!("Missing mirrors: {new_mirrors:?}"); - } - - if !dropped_mirrors.is_empty() { - println!("To-be-removed mirrors: {dropped_mirrors:?}"); - } - - for (ref id, ref mirror) in state.mirrors { - println!("\nMirror '{}'", id); - let snapshots = list_snapshots(Path::new(&medium.mountpoint), id)?; - let repo_line = match snapshots.last() { - None => { - println!("no snapshots"); - None - } - Some(last) => { - if let Some(first) = snapshots.first() { - if first == last { - println!("1 snapshot: '{last}'"); - } else { - println!("{} snapshots: '{first}..{last}'", snapshots.len()); - } - Some(generate_repo_file_line(medium_base, id, mirror, last)?) - } else { - None - } - } - }; - println!("Original repository config: '{}'", mirror.repository); - if let Some(repo_line) = repo_line { - println!("Medium repository line: '{repo_line}'"); - } - } - - Ok(()) + Ok((state, mirror_state)) } pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec) -> Result<(), Error> { @@ -257,52 +234,44 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec) -> bail!("Medium mountpoint doesn't exist."); } - let mut statefile = medium_base.to_path_buf(); - statefile.push(".mirror-state"); + let lock = lock(medium_base)?; - let _lock = config::lock_config(&format!("{}/{}", medium.mountpoint, "mirror-state"))?; - - let mut state = if statefile.exists() { - println!("Loading state from {statefile:?}.."); - let raw = file_get_contents(&statefile)?; - let state: MediumState = serde_json::from_slice(&raw)?; - println!( - "Last sync timestamp: {}", - epoch_to_rfc3339_utc(state.last_sync)? - ); - state - } else { - println!("Creating new statefile {statefile:?}.."); - MediumState { - mirrors: HashMap::new(), - last_sync: 0, + let mut state = match load_state(medium_base)? { + Some(state) => { + println!("Loaded existing statefile."); + println!( + "Last sync timestamp: {}", + epoch_to_rfc3339_utc(state.last_sync)? + ); + state + } + None => { + println!("Creating new statefile.."); + MediumState { + mirrors: HashMap::new(), + last_sync: 0, + } } }; state.last_sync = epoch_i64(); println!("Sync timestamp: {}", epoch_to_rfc3339_utc(state.last_sync)?); - let old_mirrors: HashSet = state - .mirrors - .iter() - .map(|(id, _mirror)| id.clone()) - .collect(); - let sync_mirrors: HashSet = mirrors.iter().map(|mirror| mirror.id.clone()).collect(); - let new_mirrors: HashSet = sync_mirrors.difference(&old_mirrors).cloned().collect(); - let dropped_mirrors: HashSet = old_mirrors.difference(&sync_mirrors).cloned().collect(); + let mirror_state = get_mirror_state(medium, &state); + println!("Previously synced mirrors: {:?}", &mirror_state.synced); - println!("Previously synced mirrors: {:?}", &old_mirrors); - - if !new_mirrors.is_empty() { + if !mirror_state.source_only.is_empty() { println!( - "Adding {} new mirror(s) to target medium: {new_mirrors:?}", - new_mirrors.len() + "Adding {} new mirror(s) to target medium: {:?}", + mirror_state.source_only.len(), + mirror_state.source_only, ); } - if !dropped_mirrors.is_empty() { + if !mirror_state.target_only.is_empty() { println!( - "Dropping {} removed mirror(s) from target medium (after syncing): {dropped_mirrors:?}", - dropped_mirrors.len() + "Dropping {} removed mirror(s) from target medium (after syncing): {:?}", + mirror_state.target_only.len(), + mirror_state.target_only, ); } @@ -324,16 +293,16 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec) -> Pool::create(&mirror_base, &mirror_pool)? }; - let source_pool: Pool = (&mirror).try_into()?; + let source_pool: Pool = pool(&mirror)?; source_pool.lock()?.sync_pool(&target_pool, medium.verify)?; state.mirrors.insert(mirror.id.clone(), mirror.into()); } - if !dropped_mirrors.is_empty() { + if !mirror_state.target_only.is_empty() { println!(); } - for dropped in dropped_mirrors { + for dropped in mirror_state.target_only { let mut mirror_base = medium_base.to_path_buf(); mirror_base.push(Path::new(&dropped)); @@ -345,12 +314,7 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec) -> println!("Updating statefile.."); // TODO update state file for exporting/subscription key handling/..? - replace_file( - &statefile, - &serde_json::to_vec(&state)?, - CreateOptions::default(), - true, - )?; + write_state(&lock, medium_base, &state)?; Ok(()) } diff --git a/src/mirror.rs b/src/mirror.rs index 1ff042e..4a697da 100644 --- a/src/mirror.rs +++ b/src/mirror.rs @@ -7,9 +7,16 @@ use std::{ use anyhow::{format_err, Error}; use flate2::bufread::GzDecoder; +use nix::libc; +use proxmox_sys::fs::file_get_contents; -use crate::{config::MirrorConfig, FetchResult, Progress}; -use crate::{config::ParsedMirrorConfig, snapshot::Snapshot}; +use crate::{ + config::MirrorConfig, + convert_repo_line, + pool::Pool, + types::{Snapshot, SNAPSHOT_REGEX}, + FetchResult, Progress, +}; use proxmox_apt::{ deb822::{ CheckSums, CompressionType, FileReference, FileReferenceType, PackagesFile, ReleaseFile, @@ -19,6 +26,41 @@ use proxmox_apt::{ use crate::helpers; +pub(crate) fn pool(config: &MirrorConfig) -> Result { + let pool_dir = format!("{}/.pool", config.dir); + Pool::open(Path::new(&config.dir), Path::new(&pool_dir)) +} + +struct ParsedMirrorConfig { + pub repository: APTRepository, + pub architectures: Vec, + pub pool: Pool, + pub key: Vec, + pub verify: bool, + pub sync: bool, +} + +impl TryInto for MirrorConfig { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + let pool = pool(&self)?; + + let repository = convert_repo_line(self.repository.clone())?; + + let key = file_get_contents(Path::new(&self.key_path))?; + + Ok(ParsedMirrorConfig { + repository, + architectures: self.architectures, + pool, + key, + verify: self.verify, + sync: self.sync, + }) + } +} + fn get_dist_url(repo: &APTRepository, path: &str) -> String { let dist_root = format!("{}/dists/{}", repo.uris[0], repo.suites[0]); @@ -100,7 +142,7 @@ fn fetch_release( let locked = &config.pool.lock()?; if !locked.contains(&csums) { - locked.add_file(content, &csums, true)?; + locked.add_file(content, &csums, config.sync)?; } if detached { @@ -115,7 +157,7 @@ fn fetch_release( ..Default::default() }; if !locked.contains(&csums) { - locked.add_file(&sig, &csums, true)?; + locked.add_file(&sig, &csums, config.sync)?; } locked.link_file( &csums, @@ -184,7 +226,7 @@ fn fetch_index_file( let locked = &config.pool.lock()?; if !locked.contains(&uncompressed.checksums) { - locked.add_file(decompressed, &uncompressed.checksums, true)?; + locked.add_file(decompressed, &uncompressed.checksums, config.sync)?; } // Ensure it's linked at current path @@ -229,9 +271,46 @@ fn fetch_plain_file( Ok(res) } -pub fn mirror(config: MirrorConfig) -> Result<(), Error> { +pub fn init(config: &MirrorConfig) -> Result<(), Error> { + let pool_dir = format!("{}/.pool", config.dir); + Pool::create(Path::new(&config.dir), Path::new(&pool_dir))?; + Ok(()) +} + +pub fn destroy(config: &MirrorConfig) -> Result<(), Error> { + let pool: Pool = pool(config)?; + pool.lock()?.destroy()?; + + Ok(()) +} + +pub fn list_snapshots(config: &MirrorConfig) -> Result, Error> { + let _pool: Pool = pool(config)?; + + let mut list: Vec = vec![]; + + let path = Path::new(&config.dir); + + proxmox_sys::fs::scandir( + libc::AT_FDCWD, + path, + &SNAPSHOT_REGEX, + |_l2_fd, snapshot, file_type| { + if file_type != nix::dir::Type::Directory { + return Ok(()); + } + + list.push(snapshot.parse()?); + + Ok(()) + }, + )?; + + Ok(list) +} + +pub fn create_snapshot(config: MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> { let config: ParsedMirrorConfig = config.try_into()?; - let snapshot = Snapshot::now(); let prefix = format!("{snapshot}.tmp"); let prefix = Path::new(&prefix); @@ -419,3 +498,16 @@ pub fn mirror(config: MirrorConfig) -> Result<(), Error> { Ok(()) } + +pub fn remove_snapshot(config: &MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> { + let pool: Pool = pool(config)?; + let path = pool.get_path(Path::new(&snapshot.to_string()))?; + + pool.lock()?.remove_dir(&path) +} + +pub fn gc(config: &MirrorConfig) -> Result<(usize, u64), Error> { + let pool: Pool = pool(config)?; + + pool.lock()?.gc() +} diff --git a/src/pool.rs b/src/pool.rs index e464957..5ce76a4 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -15,18 +15,18 @@ use proxmox_sys::fs::{create_path, file_get_contents, replace_file, CreateOption use walkdir::WalkDir; #[derive(Debug)] -pub struct Pool { +pub(crate) struct Pool { pool_dir: PathBuf, base_dir: PathBuf, } -pub struct PoolLockGuard<'lock> { +pub(crate) struct PoolLockGuard<'lock> { pool: &'lock Pool, _lock: Option, } impl Pool { - pub fn create(base: &Path, pool: &Path) -> Result { + pub(crate) fn create(base: &Path, pool: &Path) -> Result { if base.exists() { bail!("Pool base dir already exists."); } @@ -43,7 +43,7 @@ impl Pool { base_dir: base.to_path_buf(), }) } - pub fn open(base: &Path, pool: &Path) -> Result { + pub(crate) fn open(base: &Path, pool: &Path) -> Result { if !base.exists() { bail!("Pool base dir doesn't exist.") } @@ -58,7 +58,7 @@ impl Pool { }) } - pub fn lock(&self) -> Result { + pub(crate) fn lock(&self) -> Result { let timeout = std::time::Duration::new(10, 0); let lock = Some(proxmox_sys::fs::open_file_locked( &self.lock_path(), @@ -72,14 +72,18 @@ impl Pool { _lock: lock, }) } - pub fn contains(&self, checksums: &CheckSums) -> bool { + pub(crate) fn contains(&self, checksums: &CheckSums) -> bool { match self.get_checksum_paths(checksums) { Ok(paths) => paths.iter().any(|path| path.exists()), Err(_err) => false, } } - pub fn get_contents(&self, checksums: &CheckSums, verify: bool) -> Result, Error> { + pub(crate) fn get_contents( + &self, + checksums: &CheckSums, + verify: bool, + ) -> Result, Error> { let source = self .get_checksum_paths(checksums)? .into_iter() @@ -135,7 +139,7 @@ impl Pool { lock_path } - pub fn get_path(&self, rel_path: &Path) -> Result { + pub(crate) fn get_path(&self, rel_path: &Path) -> Result { let mut path = self.base_dir.clone(); path.push(rel_path); @@ -210,7 +214,7 @@ impl PoolLockGuard<'_> { Ok((inode_map, link_count)) } - pub fn sync_pool(&self, target: &Pool, verify: bool) -> Result<(), Error> { + pub(crate) fn sync_pool(&self, target: &Pool, verify: bool) -> Result<(), Error> { let target = target.lock()?; let (inode_map, total_link_count) = self.get_inode_csum_map()?; @@ -321,7 +325,12 @@ impl PoolLockGuard<'_> { Ok(()) } - pub fn add_file(&self, data: &[u8], checksums: &CheckSums, sync: bool) -> Result<(), Error> { + pub(crate) fn add_file( + &self, + data: &[u8], + checksums: &CheckSums, + sync: bool, + ) -> Result<(), Error> { if self.pool.contains(checksums) { bail!("Pool already contains file with this checksum."); } @@ -340,7 +349,7 @@ impl PoolLockGuard<'_> { Ok(()) } - pub fn link_file(&self, checksums: &CheckSums, path: &Path) -> Result { + pub(crate) fn link_file(&self, checksums: &CheckSums, path: &Path) -> Result { let path = self.pool.get_path(path)?; if !self.pool.path_in_base(&path) { bail!( @@ -364,7 +373,11 @@ impl PoolLockGuard<'_> { link_file_do(source, &path) } - pub fn unlink_file(&self, mut path: &Path, remove_empty_parents: bool) -> Result<(), Error> { + pub(crate) fn unlink_file( + &self, + mut path: &Path, + remove_empty_parents: bool, + ) -> Result<(), Error> { if !self.pool.path_in_base(path) { bail!("Cannot unlink file outside of pool."); } @@ -388,7 +401,7 @@ impl PoolLockGuard<'_> { Ok(()) } - pub fn remove_dir(&self, path: &Path) -> Result<(), Error> { + pub(crate) fn remove_dir(&self, path: &Path) -> Result<(), Error> { if !self.pool.path_in_base(path) { bail!("Cannot unlink file outside of pool."); } @@ -397,7 +410,7 @@ impl PoolLockGuard<'_> { .map_err(|err| format_err!("Failed to remove {path:?} - {err}")) } - pub fn gc(&self) -> Result<(usize, u64), Error> { + pub(crate) fn gc(&self) -> Result<(usize, u64), Error> { let (inode_map, _link_count) = self.get_inode_csum_map()?; let mut count = 0; @@ -462,14 +475,14 @@ impl PoolLockGuard<'_> { Ok((count, size)) } - pub fn destroy(self) -> Result<(), Error> { + pub(crate) fn destroy(self) -> Result<(), Error> { // TODO - this removes the lock file.. std::fs::remove_dir_all(self.pool_dir.clone())?; std::fs::remove_dir_all(self.base_dir.clone())?; Ok(()) } - pub fn rename(&self, from: &Path, to: &Path) -> Result<(), Error> { + pub(crate) fn rename(&self, from: &Path, to: &Path) -> Result<(), Error> { let mut abs_from = self.base_dir.clone(); abs_from.push(from); diff --git a/src/snapshot.rs b/src/snapshot.rs deleted file mode 100644 index bba4fa4..0000000 --- a/src/snapshot.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::fmt::Display; - -use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc}; - -#[derive(Debug, Clone, Copy)] -pub struct Snapshot(i64); - -impl Snapshot { - pub fn now() -> Self { - Self(epoch_i64()) - } -} - -impl Display for Snapshot { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let formatted = epoch_to_rfc3339_utc(self.0).map_err(|_| std::fmt::Error)?; - f.write_str(&formatted) - } -} diff --git a/src/types.rs b/src/types.rs index 1ed6feb..30da582 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,9 @@ -use proxmox_schema::{const_regex, ApiStringFormat, Schema, StringSchema}; +use std::{fmt::Display, str::FromStr}; + +use anyhow::Error; +use proxmox_schema::{api, const_regex, ApiStringFormat, Schema, StringSchema}; +use proxmox_serde::{forward_deserialize_to_from_str, forward_serialize_to_display}; +use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc, parse_rfc3339}; #[rustfmt::skip] #[macro_export] @@ -7,7 +12,7 @@ macro_rules! PROXMOX_SAFE_ID_REGEX_STR { () => { r"(?:[A-Za-z0-9_][A-Za-z0-9._\- const_regex! { // copied from PBS - pub PROXMOX_SAFE_ID_REGEX = concat!(r"^", PROXMOX_SAFE_ID_REGEX_STR!(), r"$"); + PROXMOX_SAFE_ID_REGEX = concat!(r"^", PROXMOX_SAFE_ID_REGEX_STR!(), r"$"); } pub const PROXMOX_SAFE_ID_FORMAT: ApiStringFormat = @@ -22,5 +27,37 @@ pub const MIRROR_ID_SCHEMA: Schema = StringSchema::new("Mirror name.") #[macro_export] macro_rules! SNAPSHOT_RE { () => (r"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z") } const_regex! { - pub SNAPSHOT_REGEX = concat!(r"^", SNAPSHOT_RE!() ,r"$"); + pub(crate) SNAPSHOT_REGEX = concat!(r"^", SNAPSHOT_RE!() ,r"$"); +} + +#[api( + type: String, + format: &ApiStringFormat::Pattern(&SNAPSHOT_REGEX), +)] +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +/// Mirror snapshot +pub struct Snapshot(i64); + +forward_serialize_to_display!(Snapshot); +forward_deserialize_to_from_str!(Snapshot); + +impl Snapshot { + pub fn now() -> Self { + Self(epoch_i64()) + } +} + +impl Display for Snapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let formatted = epoch_to_rfc3339_utc(self.0).map_err(|_| std::fmt::Error)?; + f.write_str(&formatted) + } +} + +impl FromStr for Snapshot { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(Self(parse_rfc3339(s)?)) + } }