refactor interfaces

- remove Pool from public interfaces
- remove pool_dir from MirrorConfig, rename base_dir to dir
- add mirror::{init,destroy,gc,list_snapshots, remove_snapshot}
- rename mirror::mirror to mirror::create_snapshot
- move ParsedMirrorConfig into mirror module
- add new helpers in medium.rs
- use Snapshot in more places
- make Snapshot (de)serializable and sort/comparable
- move Snapshot into types.rs
- reduce visibility where possible

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2022-04-07 11:00:42 +02:00
parent 7094b70a76
commit d035ecb59a
13 changed files with 431 additions and 333 deletions

View File

@ -30,5 +30,6 @@ proxmox-async = "0.4"
proxmox-router = { version = "1.1", features = [ "cli" ] }
proxmox-schema = { version = "1.1", features = [ "api-macro" ] }
proxmox-section-config = "1"
proxmox-serde = "0.1"
proxmox-sys = { version = "0.3" }
proxmox-time = { version = "1.1.3" }

View File

@ -13,6 +13,7 @@ use proxmox_apt_mirror::helpers::tty::{
};
use proxmox_apt_mirror::{
config::{save_config, MediaConfig, MirrorConfig},
mirror,
types::MIRROR_ID_SCHEMA,
};
@ -256,7 +257,7 @@ fn action_add_mirror(config: &SectionConfigData) -> Result<MirrorConfig, Error>
break id;
};
let base_dir = loop {
let dir = loop {
let path =
read_string_from_tty("Enter path where mirrored repository will be stored", None)?;
if Path::new(&path).exists() {
@ -266,8 +267,6 @@ fn action_add_mirror(config: &SectionConfigData) -> Result<MirrorConfig, Error>
}
};
let pool_dir = format!("{base_dir}/.pool");
let verify = read_bool_from_tty(
"Should already mirrored files be re-verified when updating the mirror? (io-intensive!)",
Some(true),
@ -281,8 +280,7 @@ fn action_add_mirror(config: &SectionConfigData) -> Result<MirrorConfig, Error>
key_path,
verify,
sync,
base_dir,
pool_dir,
dir,
})
}
@ -469,6 +467,7 @@ async fn setup(_param: Value) -> Result<(), Error> {
Action::AddMirror => {
let mirror_config = action_add_mirror(&config)?;
let id = mirror_config.id.clone();
mirror::init(&mirror_config)?;
config.set_data(&id, "mirror", mirror_config)?;
save_config(&config_file, &config)?;
println!("Config entry '{id}' added");

View File

@ -2,6 +2,7 @@ use std::{collections::HashMap, path::Path};
use anyhow::{bail, Error};
use proxmox_apt_mirror::types::Snapshot;
use proxmox_sys::{fs::file_get_contents, linux::tty};
use proxmox_time::epoch_to_rfc3339_utc;
use serde_json::Value;
@ -109,14 +110,20 @@ async fn setup(_param: Value) -> Result<(), Error> {
}
let selected_mirror = read_selection_from_tty("Select mirror", &mirrors, None)?;
let snapshots = medium::list_snapshots(mountpoint, selected_mirror)?;
let snapshots: Vec<(Snapshot, String)> =
medium::list_snapshots(mountpoint, selected_mirror)?
.into_iter()
.map(|s| (s, s.to_string()))
.collect();
if snapshots.is_empty() {
println!("Mirror doesn't have any synced snapshots.");
continue;
}
let snapshots: Vec<(&str, &str)> =
snapshots.iter().map(|s| (s.as_ref(), s.as_ref())).collect();
let snapshots: Vec<(&Snapshot, &str)> = snapshots
.iter()
.map(|(snap, string)| (snap, string.as_ref()))
.collect();
let selected_snapshot = read_selection_from_tty(
"Select snapshot",
&snapshots,
@ -127,7 +134,7 @@ async fn setup(_param: Value) -> Result<(), Error> {
selected_mirror.to_string(),
(
state.mirrors.get(*selected_mirror).unwrap(),
selected_snapshot.to_string(),
**selected_snapshot,
),
);
}

View File

@ -1,5 +1,3 @@
use std::path::Path;
use anyhow::Error;
use serde_json::Value;
@ -11,7 +9,7 @@ use proxmox_schema::{api, param_bail, ApiType, ArraySchema, ReturnType};
use proxmox_apt_mirror::{
config::{MediaConfig, MediaConfigUpdater, MirrorConfig, MirrorConfigUpdater},
pool::Pool,
mirror,
types::MIRROR_ID_SCHEMA,
};
@ -148,7 +146,8 @@ async fn add_mirror(
param_bail!("name", "mirror config entry '{}' already exists.", data.id);
}
let _pool = Pool::create(Path::new(&data.base_dir), Path::new(&data.pool_dir))?;
mirror::init(&data)?;
section_config.set_data(&data.id, "mirror", &data)?;
proxmox_apt_mirror::config::save_config(&config, &section_config)?;
@ -193,8 +192,7 @@ async fn remove_mirror(
match section_config.lookup::<MirrorConfig>("mirror", &id) {
Ok(config) => {
if remove_data {
let pool: Pool = (&config).try_into()?;
pool.lock()?.destroy()?;
mirror::destroy(&config)?;
}
section_config.sections.remove(&id);
@ -247,11 +245,8 @@ pub fn update_mirror(
if let Some(repository) = update.repository {
data.repository = repository
}
if let Some(base_dir) = update.base_dir {
data.base_dir = base_dir
}
if let Some(pool_dir) = update.pool_dir {
data.pool_dir = pool_dir
if let Some(dir) = update.dir {
data.dir = dir
}
if let Some(architectures) = update.architectures {
data.architectures = architectures

View File

@ -1,4 +1,7 @@
use std::path::Path;
use anyhow::Error;
use proxmox_time::epoch_to_rfc3339_utc;
use serde_json::Value;
use proxmox_router::cli::{CliCommand, CliCommandMap, CommandLineInterface, OUTPUT_FORMAT};
@ -6,6 +9,7 @@ use proxmox_schema::api;
use proxmox_apt_mirror::{
config::{MediaConfig, MirrorConfig},
generate_repo_file_line,
medium::{self},
types::MIRROR_ID_SCHEMA,
};
@ -69,9 +73,52 @@ async fn status(config: Option<String>, id: String, _param: Value) -> Result<Val
let config = config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_string());
let (section_config, _digest) = proxmox_apt_mirror::config::config(&config)?;
let config: MediaConfig = section_config.lookup("medium", &id)?;
let medium_config: MediaConfig = section_config.lookup("medium", &id)?;
medium::status(&config)?;
let (state, mirror_state) = medium::status(&medium_config)?;
println!(
"Last sync timestamp: {}",
epoch_to_rfc3339_utc(state.last_sync)?
);
println!("Already synced mirrors: {:?}", mirror_state.synced);
if !mirror_state.source_only.is_empty() {
println!("Missing mirrors: {:?}", mirror_state.source_only);
}
if !mirror_state.target_only.is_empty() {
println!("To-be-removed mirrors: {:?}", mirror_state.target_only);
}
for (ref id, ref mirror) in state.mirrors {
println!("\nMirror '{}'", id);
let path = Path::new(&medium_config.mountpoint);
let snapshots = medium::list_snapshots(path, id)?;
let repo_line = match snapshots.last() {
None => {
println!("no snapshots");
None
}
Some(last) => {
if let Some(first) = snapshots.first() {
if first == last {
println!("1 snapshot: '{last}'");
} else {
println!("{} snapshots: '{first}..{last}'", snapshots.len());
}
Some(generate_repo_file_line(path, id, mirror, last)?)
} else {
None
}
}
};
println!("Original repository config: '{}'", mirror.repository);
if let Some(repo_line) = repo_line {
println!("Medium repository line: '{repo_line}'");
}
}
Ok(Value::Null)
}

View File

@ -1,20 +1,17 @@
use std::path::Path;
use anyhow::Error;
use nix::libc;
use serde_json::Value;
use proxmox_router::cli::{
default_table_format_options, format_and_print_result_full, get_output_format, CliCommand,
CliCommandMap, CommandLineInterface, OUTPUT_FORMAT,
format_and_print_result, get_output_format, CliCommand, CliCommandMap, CommandLineInterface,
OUTPUT_FORMAT,
};
use proxmox_schema::{api, ApiStringFormat, ArraySchema, ReturnType, Schema, StringSchema};
use proxmox_schema::api;
use proxmox_apt_mirror::{
config::MirrorConfig,
pool::Pool,
types::{MIRROR_ID_SCHEMA, SNAPSHOT_REGEX},
mirror,
types::{Snapshot, MIRROR_ID_SCHEMA},
};
use super::DEFAULT_CONFIG_PATH;
@ -35,26 +32,16 @@ use super::DEFAULT_CONFIG_PATH;
)]
/// Create a new repository snapshot, fetching required/missing files from original repository.
async fn create_snapshot(config: Option<String>, id: String, _param: Value) -> Result<(), Error> {
//let output_format = get_output_format(&param);
let config = config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_string());
let (config, _digest) = proxmox_apt_mirror::config::config(&config)?;
let config = config.lookup("mirror", &id)?;
proxmox_apt_mirror::mirror::mirror(config)?;
proxmox_apt_mirror::mirror::create_snapshot(config, &Snapshot::now())?;
Ok(())
}
const SNAPSHOT_SCHEMA: Schema = StringSchema::new("Mirror snapshot")
.format(&ApiStringFormat::Pattern(&SNAPSHOT_REGEX))
.schema();
const LIST_SNAPSHOTS_RETURN_TYPE: ReturnType = ReturnType {
schema: &ArraySchema::new("Returns the list of snapshots.", &SNAPSHOT_SCHEMA).schema(),
optional: true,
};
#[api(
input: {
properties: {
@ -66,6 +53,10 @@ const LIST_SNAPSHOTS_RETURN_TYPE: ReturnType = ReturnType {
id: {
schema: MIRROR_ID_SCHEMA,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
},
},
)]
@ -77,33 +68,17 @@ async fn list_snapshots(config: Option<String>, id: String, param: Value) -> Res
let (config, _digest) = proxmox_apt_mirror::config::config(&config)?;
let config: MirrorConfig = config.lookup("mirror", &id)?;
let _pool: Pool = (&config).try_into()?;
let mut list = vec![];
let list = mirror::list_snapshots(&config)?;
let path = Path::new(&config.base_dir);
proxmox_sys::fs::scandir(
libc::AT_FDCWD,
path,
&SNAPSHOT_REGEX,
|_l2_fd, snapshot, file_type| {
if file_type != nix::dir::Type::Directory {
return Ok(());
}
list.push(snapshot.to_string());
Ok(())
},
)?;
let mut list = serde_json::json!(list);
format_and_print_result_full(
&mut list,
&LIST_SNAPSHOTS_RETURN_TYPE,
&output_format,
&default_table_format_options(),
);
if output_format == "text" {
println!("Found {} snapshots:", list.len());
for snap in &list {
println!("- {snap}");
}
} else {
let list = serde_json::json!(list);
format_and_print_result(&list, &output_format);
}
Ok(())
}
@ -120,7 +95,7 @@ async fn list_snapshots(config: Option<String>, id: String, param: Value) -> Res
schema: MIRROR_ID_SCHEMA,
},
snapshot: {
schema: SNAPSHOT_SCHEMA,
type: Snapshot,
},
"output-format": {
schema: OUTPUT_FORMAT,
@ -133,17 +108,14 @@ async fn list_snapshots(config: Option<String>, id: String, param: Value) -> Res
async fn remove_snapshot(
config: Option<String>,
id: String,
snapshot: String,
snapshot: Snapshot,
_param: Value,
) -> Result<(), Error> {
let config = config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_string());
let (config, _digest) = proxmox_apt_mirror::config::config(&config)?;
let config: MirrorConfig = config.lookup("mirror", &id)?;
let pool: Pool = (&config).try_into()?;
let path = pool.get_path(Path::new(&snapshot))?;
pool.lock()?.remove_dir(&path)?;
mirror::remove_snapshot(&config, &snapshot)?;
Ok(())
}
@ -172,9 +144,9 @@ async fn garbage_collect(config: Option<String>, id: String, _param: Value) -> R
let (config, _digest) = proxmox_apt_mirror::config::config(&config)?;
let config: MirrorConfig = config.lookup("mirror", &id)?;
let pool: Pool = (&config).try_into()?;
let (count, size) = pool.lock()?.gc()?;
let (count, size) = mirror::gc(&config)?;
println!("Removed {} files totalling {}b", count, size);
Ok(())

View File

@ -4,12 +4,11 @@ use anyhow::{bail, Error};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use proxmox_apt::repositories::APTRepository;
use proxmox_schema::{api, ApiType, Schema, Updater};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
use proxmox_sys::fs::{file_get_contents, replace_file, CreateOptions};
use proxmox_sys::fs::{replace_file, CreateOptions};
use crate::{convert_repo_line, pool::Pool, types::MIRROR_ID_SCHEMA};
use crate::types::MIRROR_ID_SCHEMA;
#[api(
properties: {
@ -18,61 +17,49 @@ use crate::{convert_repo_line, pool::Pool, types::MIRROR_ID_SCHEMA};
},
repository: {
type: String,
description: "Single repository definition in sources.list format.",
},
architectures: {
type: Array,
description: "List of architectures to mirror",
items: {
type: String,
description: "Architecture specifier.",
},
},
"pool-dir": {
"dir": {
type: String,
description: "Path to pool directory storing checksum files.",
},
"base-dir": {
type: String,
description: "Path to directory storing repository snapshot files (must be on same FS as `pool-dir`).",
},
"key-path": {
type: String,
description: "Path to signing key of `repository`",
},
verify: {
type: bool,
description: "Whether to verify existing files stored in pool (IO-intensive).",
},
sync: {
type: bool,
description: "Whether to write pool updates with fsync flag.",
},
}
)]
#[derive(Clone, Debug, Serialize, Deserialize, Updater)]
#[serde(rename_all = "kebab-case")]
/// Configuration file for mirrored repositories.
/// Configuration entry for a mirrored repository.
pub struct MirrorConfig {
#[updater(skip)]
/// Identifier for this entry.
pub id: String,
/// Single repository definition in sources.list format.
pub repository: String,
/// List of architectures that should be mirrored.
pub architectures: Vec<String>,
pub pool_dir: String,
pub base_dir: String,
/// Path to directory containg mirrored repository.
pub dir: String,
/// Path to public key file for verifying repository integrity.
pub key_path: String,
/// Whether to verify existing files or assume they are valid (IO-intensive).
pub verify: bool,
/// Whether to write new files using FSYNC.
pub sync: bool,
}
impl TryInto<Pool> for &MirrorConfig {
type Error = Error;
fn try_into(self) -> Result<Pool, Self::Error> {
Pool::open(Path::new(&self.base_dir), Path::new(&self.pool_dir))
}
}
#[api(
properties: {
id: {
@ -112,7 +99,7 @@ pub struct MediaConfig {
}
lazy_static! {
pub static ref CONFIG: SectionConfig = init();
static ref CONFIG: SectionConfig = init();
}
fn init() -> SectionConfig {
@ -174,33 +161,3 @@ pub fn save_config(path: &str, data: &SectionConfigData) -> Result<(), Error> {
let raw = CONFIG.write(path, data)?;
replace_file(path, raw.as_bytes(), CreateOptions::default(), true)
}
pub struct ParsedMirrorConfig {
pub repository: APTRepository,
pub architectures: Vec<String>,
pub pool: Pool,
pub key: Vec<u8>,
pub verify: bool,
pub sync: bool,
}
impl TryInto<ParsedMirrorConfig> for MirrorConfig {
type Error = anyhow::Error;
fn try_into(self) -> Result<ParsedMirrorConfig, Self::Error> {
let pool = (&self).try_into()?;
let repository = convert_repo_line(self.repository.clone())?;
let key = file_get_contents(Path::new(&self.key_path))?;
Ok(ParsedMirrorConfig {
repository,
architectures: self.architectures,
pool,
key,
verify: self.verify,
sync: self.sync,
})
}
}

View File

@ -1,17 +1,19 @@
use std::{
fmt::Display,
ops::{Add, AddAssign},
path::Path,
};
use anyhow::Error;
use anyhow::{format_err, Error};
use medium::MirrorInfo;
use proxmox_apt::repositories::{APTRepository, APTRepositoryFile, APTRepositoryFileType};
use types::Snapshot;
pub mod config;
pub mod helpers;
pub mod medium;
pub mod mirror;
pub mod pool;
pub mod snapshot;
pub mod types;
struct FetchResult {
@ -90,8 +92,39 @@ impl Display for Progress {
}
}
pub fn convert_repo_line(line: String) -> Result<APTRepository, Error> {
pub(crate) fn convert_repo_line(line: String) -> Result<APTRepository, Error> {
let mut repository = APTRepositoryFile::with_content(line, APTRepositoryFileType::List);
repository.parse()?;
Ok(repository.repositories[0].clone())
}
pub fn generate_repo_file_line(
medium_base: &Path,
mirror_id: &str,
mirror: &MirrorInfo,
snapshot: &Snapshot,
) -> Result<String, Error> {
let mut snapshot_path = medium_base.to_path_buf();
snapshot_path.push(mirror_id);
snapshot_path.push(snapshot.to_string());
let snapshot_path = snapshot_path
.to_str()
.ok_or_else(|| format_err!("Failed to convert snapshot path to String"))?;
let mut repo = convert_repo_line(mirror.repository.clone())?;
repo.uris = vec![format!("file://{}", snapshot_path)];
repo.options
.push(proxmox_apt::repositories::APTRepositoryOption {
key: "check-valid-until".to_string(),
values: vec!["false".to_string()],
});
let mut res = Vec::new();
repo.write(&mut res)?;
let res = String::from_utf8(res)
.map_err(|err| format_err!("Couldn't convert repo line to String - {err}"))?;
Ok(res.trim_end().to_string())
}

View File

@ -1,6 +1,6 @@
use std::{
collections::{HashMap, HashSet},
path::Path,
path::{Path, PathBuf},
};
use anyhow::{bail, format_err, Error};
@ -10,16 +10,17 @@ use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc};
use serde::{Deserialize, Serialize};
use crate::{
config::{self, MirrorConfig},
convert_repo_line,
config::{self, ConfigLockGuard, MediaConfig, MirrorConfig},
generate_repo_file_line,
mirror::pool,
pool::Pool,
types::SNAPSHOT_REGEX,
types::{Snapshot, SNAPSHOT_REGEX},
};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct MirrorInfo {
repository: String,
architectures: Vec<String>,
pub repository: String,
pub architectures: Vec<String>,
}
impl From<&MirrorConfig> for MirrorInfo {
@ -47,7 +48,74 @@ pub struct MediumState {
pub last_sync: i64,
}
pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result<Vec<String>, Error> {
pub struct MediumMirrorState {
pub synced: HashSet<String>,
pub source_only: HashSet<String>,
pub target_only: HashSet<String>,
}
fn get_mirror_state(config: &MediaConfig, state: &MediumState) -> MediumMirrorState {
let synced_mirrors: HashSet<String> = state
.mirrors
.iter()
.map(|(id, _mirror)| id.clone())
.collect();
let config_mirrors: HashSet<String> = config.mirrors.iter().cloned().collect();
let new_mirrors: HashSet<String> = config_mirrors
.difference(&synced_mirrors)
.cloned()
.collect();
let dropped_mirrors: HashSet<String> = synced_mirrors
.difference(&config_mirrors)
.cloned()
.collect();
MediumMirrorState {
synced: synced_mirrors,
source_only: new_mirrors,
target_only: dropped_mirrors,
}
}
fn lock(base: &Path) -> Result<ConfigLockGuard, Error> {
let mut lockfile = base.to_path_buf();
lockfile.push("mirror-state");
let lockfile = lockfile
.to_str()
.ok_or_else(|| format_err!("Couldn't convert lockfile path {lockfile:?})"))?;
config::lock_config(lockfile)
}
fn statefile(base: &Path) -> PathBuf {
let mut statefile = base.to_path_buf();
statefile.push(".mirror-state");
statefile
}
fn load_state(base: &Path) -> Result<Option<MediumState>, Error> {
let statefile = statefile(base);
if statefile.exists() {
let raw = file_get_contents(&statefile)?;
let state: MediumState = serde_json::from_slice(&raw)?;
Ok(Some(state))
} else {
Ok(None)
}
}
fn write_state(_lock: &ConfigLockGuard, base: &Path, state: &MediumState) -> Result<(), Error> {
replace_file(
&statefile(base),
&serde_json::to_vec(&state)?,
CreateOptions::default(),
true,
)?;
Ok(())
}
pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result<Vec<Snapshot>, Error> {
if !medium_base.exists() {
bail!("Medium mountpoint doesn't exist.");
}
@ -65,7 +133,7 @@ pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result<Vec<String>, E
return Ok(());
}
list.push(snapshot.to_string());
list.push(snapshot.parse()?);
Ok(())
},
@ -78,7 +146,7 @@ pub fn list_snapshots(medium_base: &Path, mirror: &str) -> Result<Vec<String>, E
pub fn generate_repo_snippet(
medium_base: &Path,
repositories: &HashMap<String, (&MirrorInfo, String)>,
repositories: &HashMap<String, (&MirrorInfo, Snapshot)>,
) -> Result<Vec<String>, Error> {
let mut res = Vec::new();
for (mirror_id, (mirror_info, snapshot)) in repositories {
@ -92,51 +160,18 @@ pub fn generate_repo_snippet(
Ok(res)
}
fn generate_repo_file_line(
medium_base: &Path,
mirror_id: &str,
mirror: &MirrorInfo,
snapshot: &str,
) -> Result<String, Error> {
let mut snapshot_path = medium_base.to_path_buf();
snapshot_path.push(mirror_id);
snapshot_path.push(snapshot);
let snapshot_path = snapshot_path
.to_str()
.ok_or_else(|| format_err!("Failed to convert snapshot path to String"))?;
let mut repo = convert_repo_line(mirror.repository.clone())?;
repo.uris = vec![format!("file://{}", snapshot_path)];
repo.options
.push(proxmox_apt::repositories::APTRepositoryOption {
key: "check-valid-until".to_string(),
values: vec!["false".to_string()],
});
let mut res = Vec::new();
repo.write(&mut res)?;
let res = String::from_utf8(res)
.map_err(|err| format_err!("Couldn't convert repo line to String - {err}"))?;
Ok(res.trim_end().to_string())
}
pub fn gc(medium: &crate::config::MediaConfig) -> Result<(), Error> {
let medium_base = Path::new(&medium.mountpoint);
if !medium_base.exists() {
bail!("Medium mountpoint doesn't exist.");
}
let mut statefile = medium_base.to_path_buf();
statefile.push(".mirror-state");
let _lock = lock(medium_base)?;
let _lock = config::lock_config(&format!("{}/{}", medium.mountpoint, "mirror-state"))?;
println!("Loading state..");
let state = load_state(medium_base)?
.ok_or_else(|| format_err!("Cannot GC empty medium - no statefile found."))?;
println!("Loading state from {statefile:?}..");
let raw = file_get_contents(&statefile)?;
let state: MediumState = serde_json::from_slice(&raw)?;
println!(
"Last sync timestamp: {}",
epoch_to_rfc3339_utc(state.last_sync)?
@ -170,77 +205,19 @@ pub fn gc(medium: &crate::config::MediaConfig) -> Result<(), Error> {
Ok(())
}
pub fn status(medium: &crate::config::MediaConfig) -> Result<(), Error> {
pub fn status(
medium: &crate::config::MediaConfig,
) -> Result<(MediumState, MediumMirrorState), Error> {
let medium_base = Path::new(&medium.mountpoint);
if !medium_base.exists() {
bail!("Medium mountpoint doesn't exist.");
}
let mut statefile = medium_base.to_path_buf();
statefile.push(".mirror-state");
let state = load_state(medium_base)?
.ok_or_else(|| format_err!("No status available - statefile doesn't exist."))?;
let mirror_state = get_mirror_state(medium, &state);
println!("Loading state from {statefile:?}..");
let raw = file_get_contents(&statefile)?;
let state: MediumState = serde_json::from_slice(&raw)?;
println!(
"Last sync timestamp: {}",
epoch_to_rfc3339_utc(state.last_sync)?
);
let synced_mirrors: HashSet<String> = state
.mirrors
.iter()
.map(|(id, _mirror)| id.clone())
.collect();
let config_mirrors: HashSet<String> = medium.mirrors.iter().cloned().collect();
let new_mirrors: HashSet<String> = config_mirrors
.difference(&synced_mirrors)
.cloned()
.collect();
let dropped_mirrors: HashSet<String> = synced_mirrors
.difference(&config_mirrors)
.cloned()
.collect();
println!("Already synced mirrors: {synced_mirrors:?}");
println!("Configured mirrors: {config_mirrors:?}");
if !new_mirrors.is_empty() {
println!("Missing mirrors: {new_mirrors:?}");
}
if !dropped_mirrors.is_empty() {
println!("To-be-removed mirrors: {dropped_mirrors:?}");
}
for (ref id, ref mirror) in state.mirrors {
println!("\nMirror '{}'", id);
let snapshots = list_snapshots(Path::new(&medium.mountpoint), id)?;
let repo_line = match snapshots.last() {
None => {
println!("no snapshots");
None
}
Some(last) => {
if let Some(first) = snapshots.first() {
if first == last {
println!("1 snapshot: '{last}'");
} else {
println!("{} snapshots: '{first}..{last}'", snapshots.len());
}
Some(generate_repo_file_line(medium_base, id, mirror, last)?)
} else {
None
}
}
};
println!("Original repository config: '{}'", mirror.repository);
if let Some(repo_line) = repo_line {
println!("Medium repository line: '{repo_line}'");
}
}
Ok(())
Ok((state, mirror_state))
}
pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec<MirrorConfig>) -> Result<(), Error> {
@ -257,52 +234,44 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec<MirrorConfig>) ->
bail!("Medium mountpoint doesn't exist.");
}
let mut statefile = medium_base.to_path_buf();
statefile.push(".mirror-state");
let lock = lock(medium_base)?;
let _lock = config::lock_config(&format!("{}/{}", medium.mountpoint, "mirror-state"))?;
let mut state = if statefile.exists() {
println!("Loading state from {statefile:?}..");
let raw = file_get_contents(&statefile)?;
let state: MediumState = serde_json::from_slice(&raw)?;
println!(
"Last sync timestamp: {}",
epoch_to_rfc3339_utc(state.last_sync)?
);
state
} else {
println!("Creating new statefile {statefile:?}..");
MediumState {
mirrors: HashMap::new(),
last_sync: 0,
let mut state = match load_state(medium_base)? {
Some(state) => {
println!("Loaded existing statefile.");
println!(
"Last sync timestamp: {}",
epoch_to_rfc3339_utc(state.last_sync)?
);
state
}
None => {
println!("Creating new statefile..");
MediumState {
mirrors: HashMap::new(),
last_sync: 0,
}
}
};
state.last_sync = epoch_i64();
println!("Sync timestamp: {}", epoch_to_rfc3339_utc(state.last_sync)?);
let old_mirrors: HashSet<String> = state
.mirrors
.iter()
.map(|(id, _mirror)| id.clone())
.collect();
let sync_mirrors: HashSet<String> = mirrors.iter().map(|mirror| mirror.id.clone()).collect();
let new_mirrors: HashSet<String> = sync_mirrors.difference(&old_mirrors).cloned().collect();
let dropped_mirrors: HashSet<String> = old_mirrors.difference(&sync_mirrors).cloned().collect();
let mirror_state = get_mirror_state(medium, &state);
println!("Previously synced mirrors: {:?}", &mirror_state.synced);
println!("Previously synced mirrors: {:?}", &old_mirrors);
if !new_mirrors.is_empty() {
if !mirror_state.source_only.is_empty() {
println!(
"Adding {} new mirror(s) to target medium: {new_mirrors:?}",
new_mirrors.len()
"Adding {} new mirror(s) to target medium: {:?}",
mirror_state.source_only.len(),
mirror_state.source_only,
);
}
if !dropped_mirrors.is_empty() {
if !mirror_state.target_only.is_empty() {
println!(
"Dropping {} removed mirror(s) from target medium (after syncing): {dropped_mirrors:?}",
dropped_mirrors.len()
"Dropping {} removed mirror(s) from target medium (after syncing): {:?}",
mirror_state.target_only.len(),
mirror_state.target_only,
);
}
@ -324,16 +293,16 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec<MirrorConfig>) ->
Pool::create(&mirror_base, &mirror_pool)?
};
let source_pool: Pool = (&mirror).try_into()?;
let source_pool: Pool = pool(&mirror)?;
source_pool.lock()?.sync_pool(&target_pool, medium.verify)?;
state.mirrors.insert(mirror.id.clone(), mirror.into());
}
if !dropped_mirrors.is_empty() {
if !mirror_state.target_only.is_empty() {
println!();
}
for dropped in dropped_mirrors {
for dropped in mirror_state.target_only {
let mut mirror_base = medium_base.to_path_buf();
mirror_base.push(Path::new(&dropped));
@ -345,12 +314,7 @@ pub fn sync(medium: &crate::config::MediaConfig, mirrors: Vec<MirrorConfig>) ->
println!("Updating statefile..");
// TODO update state file for exporting/subscription key handling/..?
replace_file(
&statefile,
&serde_json::to_vec(&state)?,
CreateOptions::default(),
true,
)?;
write_state(&lock, medium_base, &state)?;
Ok(())
}

View File

@ -7,9 +7,16 @@ use std::{
use anyhow::{format_err, Error};
use flate2::bufread::GzDecoder;
use nix::libc;
use proxmox_sys::fs::file_get_contents;
use crate::{config::MirrorConfig, FetchResult, Progress};
use crate::{config::ParsedMirrorConfig, snapshot::Snapshot};
use crate::{
config::MirrorConfig,
convert_repo_line,
pool::Pool,
types::{Snapshot, SNAPSHOT_REGEX},
FetchResult, Progress,
};
use proxmox_apt::{
deb822::{
CheckSums, CompressionType, FileReference, FileReferenceType, PackagesFile, ReleaseFile,
@ -19,6 +26,41 @@ use proxmox_apt::{
use crate::helpers;
pub(crate) fn pool(config: &MirrorConfig) -> Result<Pool, Error> {
let pool_dir = format!("{}/.pool", config.dir);
Pool::open(Path::new(&config.dir), Path::new(&pool_dir))
}
struct ParsedMirrorConfig {
pub repository: APTRepository,
pub architectures: Vec<String>,
pub pool: Pool,
pub key: Vec<u8>,
pub verify: bool,
pub sync: bool,
}
impl TryInto<ParsedMirrorConfig> for MirrorConfig {
type Error = anyhow::Error;
fn try_into(self) -> Result<ParsedMirrorConfig, Self::Error> {
let pool = pool(&self)?;
let repository = convert_repo_line(self.repository.clone())?;
let key = file_get_contents(Path::new(&self.key_path))?;
Ok(ParsedMirrorConfig {
repository,
architectures: self.architectures,
pool,
key,
verify: self.verify,
sync: self.sync,
})
}
}
fn get_dist_url(repo: &APTRepository, path: &str) -> String {
let dist_root = format!("{}/dists/{}", repo.uris[0], repo.suites[0]);
@ -100,7 +142,7 @@ fn fetch_release(
let locked = &config.pool.lock()?;
if !locked.contains(&csums) {
locked.add_file(content, &csums, true)?;
locked.add_file(content, &csums, config.sync)?;
}
if detached {
@ -115,7 +157,7 @@ fn fetch_release(
..Default::default()
};
if !locked.contains(&csums) {
locked.add_file(&sig, &csums, true)?;
locked.add_file(&sig, &csums, config.sync)?;
}
locked.link_file(
&csums,
@ -184,7 +226,7 @@ fn fetch_index_file(
let locked = &config.pool.lock()?;
if !locked.contains(&uncompressed.checksums) {
locked.add_file(decompressed, &uncompressed.checksums, true)?;
locked.add_file(decompressed, &uncompressed.checksums, config.sync)?;
}
// Ensure it's linked at current path
@ -229,9 +271,46 @@ fn fetch_plain_file(
Ok(res)
}
pub fn mirror(config: MirrorConfig) -> Result<(), Error> {
pub fn init(config: &MirrorConfig) -> Result<(), Error> {
let pool_dir = format!("{}/.pool", config.dir);
Pool::create(Path::new(&config.dir), Path::new(&pool_dir))?;
Ok(())
}
pub fn destroy(config: &MirrorConfig) -> Result<(), Error> {
let pool: Pool = pool(config)?;
pool.lock()?.destroy()?;
Ok(())
}
pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
let _pool: Pool = pool(config)?;
let mut list: Vec<Snapshot> = vec![];
let path = Path::new(&config.dir);
proxmox_sys::fs::scandir(
libc::AT_FDCWD,
path,
&SNAPSHOT_REGEX,
|_l2_fd, snapshot, file_type| {
if file_type != nix::dir::Type::Directory {
return Ok(());
}
list.push(snapshot.parse()?);
Ok(())
},
)?;
Ok(list)
}
pub fn create_snapshot(config: MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> {
let config: ParsedMirrorConfig = config.try_into()?;
let snapshot = Snapshot::now();
let prefix = format!("{snapshot}.tmp");
let prefix = Path::new(&prefix);
@ -419,3 +498,16 @@ pub fn mirror(config: MirrorConfig) -> Result<(), Error> {
Ok(())
}
pub fn remove_snapshot(config: &MirrorConfig, snapshot: &Snapshot) -> Result<(), Error> {
let pool: Pool = pool(config)?;
let path = pool.get_path(Path::new(&snapshot.to_string()))?;
pool.lock()?.remove_dir(&path)
}
pub fn gc(config: &MirrorConfig) -> Result<(usize, u64), Error> {
let pool: Pool = pool(config)?;
pool.lock()?.gc()
}

View File

@ -15,18 +15,18 @@ use proxmox_sys::fs::{create_path, file_get_contents, replace_file, CreateOption
use walkdir::WalkDir;
#[derive(Debug)]
pub struct Pool {
pub(crate) struct Pool {
pool_dir: PathBuf,
base_dir: PathBuf,
}
pub struct PoolLockGuard<'lock> {
pub(crate) struct PoolLockGuard<'lock> {
pool: &'lock Pool,
_lock: Option<File>,
}
impl Pool {
pub fn create(base: &Path, pool: &Path) -> Result<Self, Error> {
pub(crate) fn create(base: &Path, pool: &Path) -> Result<Self, Error> {
if base.exists() {
bail!("Pool base dir already exists.");
}
@ -43,7 +43,7 @@ impl Pool {
base_dir: base.to_path_buf(),
})
}
pub fn open(base: &Path, pool: &Path) -> Result<Self, Error> {
pub(crate) fn open(base: &Path, pool: &Path) -> Result<Self, Error> {
if !base.exists() {
bail!("Pool base dir doesn't exist.")
}
@ -58,7 +58,7 @@ impl Pool {
})
}
pub fn lock(&self) -> Result<PoolLockGuard, Error> {
pub(crate) fn lock(&self) -> Result<PoolLockGuard, Error> {
let timeout = std::time::Duration::new(10, 0);
let lock = Some(proxmox_sys::fs::open_file_locked(
&self.lock_path(),
@ -72,14 +72,18 @@ impl Pool {
_lock: lock,
})
}
pub fn contains(&self, checksums: &CheckSums) -> bool {
pub(crate) fn contains(&self, checksums: &CheckSums) -> bool {
match self.get_checksum_paths(checksums) {
Ok(paths) => paths.iter().any(|path| path.exists()),
Err(_err) => false,
}
}
pub fn get_contents(&self, checksums: &CheckSums, verify: bool) -> Result<Vec<u8>, Error> {
pub(crate) fn get_contents(
&self,
checksums: &CheckSums,
verify: bool,
) -> Result<Vec<u8>, Error> {
let source = self
.get_checksum_paths(checksums)?
.into_iter()
@ -135,7 +139,7 @@ impl Pool {
lock_path
}
pub fn get_path(&self, rel_path: &Path) -> Result<PathBuf, Error> {
pub(crate) fn get_path(&self, rel_path: &Path) -> Result<PathBuf, Error> {
let mut path = self.base_dir.clone();
path.push(rel_path);
@ -210,7 +214,7 @@ impl PoolLockGuard<'_> {
Ok((inode_map, link_count))
}
pub fn sync_pool(&self, target: &Pool, verify: bool) -> Result<(), Error> {
pub(crate) fn sync_pool(&self, target: &Pool, verify: bool) -> Result<(), Error> {
let target = target.lock()?;
let (inode_map, total_link_count) = self.get_inode_csum_map()?;
@ -321,7 +325,12 @@ impl PoolLockGuard<'_> {
Ok(())
}
pub fn add_file(&self, data: &[u8], checksums: &CheckSums, sync: bool) -> Result<(), Error> {
pub(crate) fn add_file(
&self,
data: &[u8],
checksums: &CheckSums,
sync: bool,
) -> Result<(), Error> {
if self.pool.contains(checksums) {
bail!("Pool already contains file with this checksum.");
}
@ -340,7 +349,7 @@ impl PoolLockGuard<'_> {
Ok(())
}
pub fn link_file(&self, checksums: &CheckSums, path: &Path) -> Result<bool, Error> {
pub(crate) fn link_file(&self, checksums: &CheckSums, path: &Path) -> Result<bool, Error> {
let path = self.pool.get_path(path)?;
if !self.pool.path_in_base(&path) {
bail!(
@ -364,7 +373,11 @@ impl PoolLockGuard<'_> {
link_file_do(source, &path)
}
pub fn unlink_file(&self, mut path: &Path, remove_empty_parents: bool) -> Result<(), Error> {
pub(crate) fn unlink_file(
&self,
mut path: &Path,
remove_empty_parents: bool,
) -> Result<(), Error> {
if !self.pool.path_in_base(path) {
bail!("Cannot unlink file outside of pool.");
}
@ -388,7 +401,7 @@ impl PoolLockGuard<'_> {
Ok(())
}
pub fn remove_dir(&self, path: &Path) -> Result<(), Error> {
pub(crate) fn remove_dir(&self, path: &Path) -> Result<(), Error> {
if !self.pool.path_in_base(path) {
bail!("Cannot unlink file outside of pool.");
}
@ -397,7 +410,7 @@ impl PoolLockGuard<'_> {
.map_err(|err| format_err!("Failed to remove {path:?} - {err}"))
}
pub fn gc(&self) -> Result<(usize, u64), Error> {
pub(crate) fn gc(&self) -> Result<(usize, u64), Error> {
let (inode_map, _link_count) = self.get_inode_csum_map()?;
let mut count = 0;
@ -462,14 +475,14 @@ impl PoolLockGuard<'_> {
Ok((count, size))
}
pub fn destroy(self) -> Result<(), Error> {
pub(crate) fn destroy(self) -> Result<(), Error> {
// TODO - this removes the lock file..
std::fs::remove_dir_all(self.pool_dir.clone())?;
std::fs::remove_dir_all(self.base_dir.clone())?;
Ok(())
}
pub fn rename(&self, from: &Path, to: &Path) -> Result<(), Error> {
pub(crate) fn rename(&self, from: &Path, to: &Path) -> Result<(), Error> {
let mut abs_from = self.base_dir.clone();
abs_from.push(from);

View File

@ -1,19 +0,0 @@
use std::fmt::Display;
use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc};
#[derive(Debug, Clone, Copy)]
pub struct Snapshot(i64);
impl Snapshot {
pub fn now() -> Self {
Self(epoch_i64())
}
}
impl Display for Snapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let formatted = epoch_to_rfc3339_utc(self.0).map_err(|_| std::fmt::Error)?;
f.write_str(&formatted)
}
}

View File

@ -1,4 +1,9 @@
use proxmox_schema::{const_regex, ApiStringFormat, Schema, StringSchema};
use std::{fmt::Display, str::FromStr};
use anyhow::Error;
use proxmox_schema::{api, const_regex, ApiStringFormat, Schema, StringSchema};
use proxmox_serde::{forward_deserialize_to_from_str, forward_serialize_to_display};
use proxmox_time::{epoch_i64, epoch_to_rfc3339_utc, parse_rfc3339};
#[rustfmt::skip]
#[macro_export]
@ -7,7 +12,7 @@ macro_rules! PROXMOX_SAFE_ID_REGEX_STR { () => { r"(?:[A-Za-z0-9_][A-Za-z0-9._\-
const_regex! {
// copied from PBS
pub PROXMOX_SAFE_ID_REGEX = concat!(r"^", PROXMOX_SAFE_ID_REGEX_STR!(), r"$");
PROXMOX_SAFE_ID_REGEX = concat!(r"^", PROXMOX_SAFE_ID_REGEX_STR!(), r"$");
}
pub const PROXMOX_SAFE_ID_FORMAT: ApiStringFormat =
@ -22,5 +27,37 @@ pub const MIRROR_ID_SCHEMA: Schema = StringSchema::new("Mirror name.")
#[macro_export]
macro_rules! SNAPSHOT_RE { () => (r"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z") }
const_regex! {
pub SNAPSHOT_REGEX = concat!(r"^", SNAPSHOT_RE!() ,r"$");
pub(crate) SNAPSHOT_REGEX = concat!(r"^", SNAPSHOT_RE!() ,r"$");
}
#[api(
type: String,
format: &ApiStringFormat::Pattern(&SNAPSHOT_REGEX),
)]
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
/// Mirror snapshot
pub struct Snapshot(i64);
forward_serialize_to_display!(Snapshot);
forward_deserialize_to_from_str!(Snapshot);
impl Snapshot {
pub fn now() -> Self {
Self(epoch_i64())
}
}
impl Display for Snapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let formatted = epoch_to_rfc3339_utc(self.0).map_err(|_| std::fmt::Error)?;
f.write_str(&formatted)
}
}
impl FromStr for Snapshot {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(parse_rfc3339(s)?))
}
}