diff --git a/src/api2.rs b/src/api2.rs index 023bd867..046edfc9 100644 --- a/src/api2.rs +++ b/src/api2.rs @@ -2,6 +2,7 @@ pub mod types; pub mod config; pub mod admin; pub mod backup; +pub mod reader; pub mod node; mod version; mod subscription; @@ -18,6 +19,7 @@ pub fn router() -> Router { .subdir("access", access::router()) .subdir("admin", admin::router()) .subdir("backup", backup::router()) + .subdir("reader", reader::router()) .subdir("config", config::router()) .subdir("nodes", nodes) .subdir("subscription", subscription::router()) diff --git a/src/api2/reader.rs b/src/api2/reader.rs new file mode 100644 index 00000000..f4f78d60 --- /dev/null +++ b/src/api2/reader.rs @@ -0,0 +1,193 @@ +use failure::*; +use lazy_static::lazy_static; + +use std::sync::Arc; + +use futures::*; +use hyper::header::{self, HeaderValue, UPGRADE}; +use hyper::{Body, Response, StatusCode}; +use hyper::http::request::Parts; +//use chrono::{Local, TimeZone}; + +use serde_json::Value; + +use crate::tools; +use crate::api_schema::router::*; +use crate::api_schema::*; +use crate::server::{WorkerTask, H2Service}; +use crate::backup::*; +//use crate::api2::types::*; + +mod environment; +use environment::*; + +pub fn router() -> Router { + Router::new() + .upgrade(api_method_upgrade_backup()) +} + +pub fn api_method_upgrade_backup() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upgrade_to_backup_reader_protocol, + ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "').")) + .required("store", StringSchema::new("Datastore name.")) + .required("backup-type", StringSchema::new("Backup type.") + .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"])))) + .required("backup-id", StringSchema::new("Backup ID.")) + .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)") + .minimum(1547797308)) + .optional("debug", BooleanSchema::new("Enable verbose debug logging.")) + ) +} + +fn upgrade_to_backup_reader_protocol( + parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: Box, +) -> Result { + + let debug = param["debug"].as_bool().unwrap_or(false); + + let store = tools::required_string_param(¶m, "store")?.to_owned(); + let datastore = DataStore::lookup_datastore(&store)?; + + let backup_type = tools::required_string_param(¶m, "backup-type")?; + let backup_id = tools::required_string_param(¶m, "backup-id")?; + let backup_time = tools::required_integer_param(¶m, "backup-time")?; + + let protocols = parts + .headers + .get("UPGRADE") + .ok_or_else(|| format_err!("missing Upgrade header"))? + .to_str()?; + + if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { + bail!("invalid protocol name"); + } + + if parts.version >= http::version::Version::HTTP_2 { + bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); + } + + let username = rpcenv.get_user().unwrap(); + let env_type = rpcenv.env_type(); + + let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); + let path = datastore.base_path(); + + //let files = BackupInfo::list_files(&path, &backup_dir)?; + + let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); + + WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { + let mut env = ReaderEnvironment::new( + env_type, username.clone(), worker.clone(), datastore, backup_dir); + + env.debug = debug; + + env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); + + let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug); + + let abort_future = worker.abort_future(); + + let env3 = env.clone(); + + req_body + .on_upgrade() + .map_err(Error::from) + .and_then(move |conn| { + env3.debug("protocol upgrade done"); + + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let window_size = 32*1024*1024; // max = (1 << 31) - 2 + http.http2_initial_stream_window_size(window_size); + http.http2_initial_connection_window_size(window_size); + + http.serve_connection(conn, service) + .map_err(Error::from) + }) + .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) + .map_err(|(err, _)| err) + .and_then(move |(_result, _)| { + env.log("reader finished sucessfully"); + Ok(()) + }) + })?; + + let response = Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS) + .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) + .body(Body::empty())?; + + Ok(Box::new(futures::future::ok(response))) +} + +lazy_static!{ + static ref READER_ROUTER: Router = reader_api(); +} + +pub fn reader_api() -> Router { + + let router = Router::new() + .subdir( + "download", Router::new() + .download(api_method_download_file()) + ); + + router +} + +pub fn api_method_download_file() -> ApiAsyncMethod { + ApiAsyncMethod::new( + download_file, + ObjectSchema::new("Download specified file.") + .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) + ) +} + +fn download_file( + _parts: Parts, + _req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: Box, +) -> Result { + + let env: &ReaderEnvironment = rpcenv.as_ref(); + let env2 = env.clone(); + + let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); + + let mut path = env.datastore.base_path(); + path.push(env.backup_dir.relative_path()); + path.push(&file_name); + + let path2 = path.clone(); + let path3 = path.clone(); + + let response_future = tokio::fs::File::open(path) + .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) + .and_then(move |file| { + env2.log(format!("download {:?}", path3)); + let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). + map(|bytes| { + //sigh - howto avoid copy here? or the whole map() ?? + hyper::Chunk::from(bytes.to_vec()) + }); + let body = Body::wrap_stream(payload); + + // fixme: set other headers ? + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap()) + }); + + Ok(Box::new(response_future)) +} diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs new file mode 100644 index 00000000..082973d3 --- /dev/null +++ b/src/api2/reader/environment.rs @@ -0,0 +1,94 @@ +//use failure::*; +use std::sync::Arc; +use std::collections::HashMap; + +use serde_json::Value; + +use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; +use crate::server::WorkerTask; +use crate::backup::*; +use crate::server::formatter::*; + +//use proxmox::tools; + +/// `RpcEnvironmet` implementation for backup reader service +#[derive(Clone)] +pub struct ReaderEnvironment { + env_type: RpcEnvironmentType, + result_attributes: HashMap, + user: String, + pub debug: bool, + pub formatter: &'static OutputFormatter, + pub worker: Arc, + pub datastore: Arc, + pub backup_dir: BackupDir, + // state: Arc> +} + +impl ReaderEnvironment { + pub fn new( + env_type: RpcEnvironmentType, + user: String, + worker: Arc, + datastore: Arc, + backup_dir: BackupDir, + ) -> Self { + + + Self { + result_attributes: HashMap::new(), + env_type, + user, + worker, + datastore, + debug: false, + formatter: &JSON_FORMATTER, + backup_dir, + //state: Arc::new(Mutex::new(state)), + } + } + + pub fn log>(&self, msg: S) { + self.worker.log(msg); + } + + pub fn debug>(&self, msg: S) { + if self.debug { self.worker.log(msg); } + } + +} + +impl RpcEnvironment for ReaderEnvironment { + + fn set_result_attrib(&mut self, name: &str, value: Value) { + self.result_attributes.insert(name.into(), value); + } + + fn get_result_attrib(&self, name: &str) -> Option<&Value> { + self.result_attributes.get(name) + } + + fn env_type(&self) -> RpcEnvironmentType { + self.env_type + } + + fn set_user(&mut self, _user: Option) { + panic!("unable to change user"); + } + + fn get_user(&self) -> Option { + Some(self.user.clone()) + } +} + +impl AsRef for dyn RpcEnvironment { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any().downcast_ref::().unwrap() + } +} + +impl AsRef for Box { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any().downcast_ref::().unwrap() + } +} diff --git a/src/backup.rs b/src/backup.rs index 19869ff1..06b5edde 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -107,6 +107,11 @@ macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { () => { "proxmox-backup-protocol-v1" } } +#[macro_export] +macro_rules! PROXMOX_BACKUP_READER_PROTOCOL_ID_V1 { + () => { "proxmox-backup-reader-protocol-v1" } +} + mod file_formats; pub use file_formats::*; diff --git a/src/client/http_client.rs b/src/client/http_client.rs index f4abe2aa..f6f8ed66 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -4,7 +4,7 @@ use http::Uri; use hyper::Body; use hyper::client::Client; use xdg::BaseDirectories; -use chrono::Utc; +use chrono::{DateTime, Local, Utc}; use std::collections::HashSet; use std::sync::{Arc, Mutex}; use std::io::Write; @@ -275,6 +275,28 @@ impl HttpClient { .map(|(h2, canceller)| BackupClient::new(h2, canceller)) } + pub fn start_backup_reader( + &self, + datastore: &str, + backup_type: &str, + backup_id: &str, + backup_time: DateTime, + debug: bool, + ) -> impl Future { + + let param = json!({ + "backup-type": backup_type, + "backup-id": backup_id, + "backup-time": backup_time.timestamp(), + "store": datastore, + "debug": debug, + }); + let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap(); + + self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) + .map(|(h2, canceller)| BackupReader::new(h2, canceller)) + } + pub fn start_h2_connection( &self, mut req: Request, @@ -428,12 +450,69 @@ impl HttpClient { } } -//#[derive(Clone)] + +pub struct BackupReader { + h2: H2Client, + canceller: Option, +} + +impl Drop for BackupReader { + + fn drop(&mut self) { + if let Some(canceller) = self.canceller.take() { + canceller.cancel(); + } + } +} + +impl BackupReader { + + pub fn new(h2: H2Client, canceller: Canceller) -> Self { + Self { h2, canceller: Some(canceller) } + } + + pub fn get(&self, path: &str, param: Option) -> impl Future { + self.h2.get(path, param) + } + + pub fn put(&self, path: &str, param: Option) -> impl Future { + self.h2.put(path, param) + } + + pub fn post(&self, path: &str, param: Option) -> impl Future { + self.h2.post(path, param) + } + + pub fn download( + &self, + file_name: &str, + output: W, + ) -> impl Future { + let path = "download"; + let param = json!({ "file-name": file_name }); + self.h2.download(path, Some(param), output) + } + + pub fn force_close(mut self) { + if let Some(canceller) = self.canceller.take() { + canceller.cancel(); + } + } +} + pub struct BackupClient { h2: H2Client, canceller: Option, } +impl Drop for BackupClient { + + fn drop(&mut self) { + if let Some(canceller) = self.canceller.take() { + canceller.cancel(); + } + } +} impl BackupClient { @@ -462,7 +541,9 @@ impl BackupClient { } pub fn force_close(mut self) { - self.canceller.take().unwrap().cancel(); + if let Some(canceller) = self.canceller.take() { + canceller.cancel(); + } } pub fn upload_blob_from_data( @@ -905,6 +986,34 @@ impl H2Client { self.request(req) } + pub fn download(&self, path: &str, param: Option, output: W) -> impl Future { + let request = Self::request_builder("localhost", "GET", path, param).unwrap(); + + self.send_request(request, None) + .and_then(move |response| { + response + .map_err(Error::from) + .and_then(move |resp| { + let status = resp.status(); + if !status.is_success() { + future::Either::A( + H2Client::h2api_response(resp) + .and_then(|_| { bail!("unknown error"); }) + ) + } else { + future::Either::B( + resp.into_body() + .map_err(Error::from) + .fold(output, move |mut acc, chunk| { + acc.write_all(&chunk)?; + Ok::<_, Error>(acc) + }) + ) + } + }) + }) + } + pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { let request = Self::request_builder("localhost", "POST", path, param).unwrap();