diff --git a/Cargo.toml b/Cargo.toml index 5cebf940..dc2eec57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,9 @@ futures = "0.1" tokio-threadpool = "0.1" tokio = "0.1" tokio-fs = "0.1" -tokio-tls = "0.2.1" -native-tls = "0.2.2" +tokio-tls = "0.2" +tokio-signal = "0.2" +native-tls = "0.2" http = "0.1" hyper = "0.12" hyper-tls = "0.3" diff --git a/src/server.rs b/src/server.rs index 1c91042b..0786ba38 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,9 +7,14 @@ mod environment; pub use environment::*; +mod state; +pub use state::*; + mod worker_task; pub use worker_task::*; + pub mod formatter; + #[macro_use] pub mod rest; diff --git a/src/server/state.rs b/src/server/state.rs new file mode 100644 index 00000000..f7ee4300 --- /dev/null +++ b/src/server/state.rs @@ -0,0 +1,151 @@ +use failure::*; +use lazy_static::lazy_static; +use std::sync::Mutex; + +use futures::*; +use futures::stream::Stream; + +use tokio::sync::oneshot; +use tokio_signal::unix::{Signal, SIGHUP, SIGINT}; + +use crate::tools; + +#[derive(PartialEq, Copy, Clone, Debug)] +pub enum ServerMode { + Normal, + Shutdown, +} + +pub struct ServerState { + pub mode: ServerMode, + pub shutdown_listeners: Vec>, + pub last_worker_listeners: Vec>, + pub worker_count: usize, + pub reload_request: bool, +} + + +lazy_static! { + static ref SERVER_STATE: Mutex = Mutex::new(ServerState { + mode: ServerMode::Normal, + shutdown_listeners: vec![], + last_worker_listeners: vec![], + worker_count: 0, + reload_request: false, + }); +} + +pub fn server_state_init() -> Result<(), Error> { + + let stream = Signal::new(SIGINT).flatten_stream(); + + let future = stream.for_each(|_| { + println!("got shutdown request (SIGINT)"); + SERVER_STATE.lock().unwrap().reload_request = false; + tools::request_shutdown(); + Ok(()) + }).map_err(|_| {}); + + let abort_future = last_worker_future().map_err(|_| {}); + let task = future.select(abort_future); + + tokio::spawn(task.map(|_| {}).map_err(|_| {})); + + let stream = Signal::new(SIGHUP).flatten_stream(); + + let future = stream.for_each(|_| { + println!("got reload request (SIGHUP)"); + SERVER_STATE.lock().unwrap().reload_request = true; + tools::request_shutdown(); + Ok(()) + }).map_err(|_| {}); + + let abort_future = last_worker_future().map_err(|_| {}); + let task = future.select(abort_future); + + tokio::spawn(task.map(|_| {}).map_err(|_| {})); + + Ok(()) +} + +pub fn is_reload_request() -> bool { + let data = SERVER_STATE.lock().unwrap(); + + if data.mode == ServerMode::Shutdown && data.reload_request { + true + } else { + false + } +} + +pub fn server_shutdown() { + let mut data = SERVER_STATE.lock().unwrap(); + + println!("SET SHUTDOWN MODE"); + + data.mode = ServerMode::Shutdown; + + notify_listeners(&mut data.shutdown_listeners); + + drop(data); // unlock + + check_last_worker(); +} + +pub fn shutdown_future() -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + + let mut data = SERVER_STATE.lock().unwrap(); + match data.mode { + ServerMode::Normal => { data.shutdown_listeners.push(tx); }, + ServerMode::Shutdown => { let _ = tx.send(()); }, + } + + rx +} + +pub fn last_worker_future() -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + + let mut data = SERVER_STATE.lock().unwrap(); + if data.mode == ServerMode::Shutdown && data.worker_count == 0 { + let _ = tx.send(()); + } else { + data.last_worker_listeners.push(tx); + } + + rx +} + +pub fn set_worker_count(count: usize) { + let mut data = SERVER_STATE.lock().unwrap(); + data.worker_count = count; + + if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } + + notify_listeners(&mut data.last_worker_listeners); +} + + +pub fn check_last_worker() { + + let mut data = SERVER_STATE.lock().unwrap(); + + if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; } + + notify_listeners(&mut data.last_worker_listeners); +} + +fn notify_listeners(list: &mut Vec>) { + loop { + match list.pop() { + None => { break; }, + Some(ch) => { + println!("SEND ABORT"); + if let Err(_) = ch.send(()) { + eprintln!("SEND ABORT failed"); + } + }, + } + } +} diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 2b9ae069..49e5a140 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -410,7 +410,10 @@ impl WorkerTask { }), }); - WORKER_TASK_LIST.lock().unwrap().insert(task_id, worker.clone()); + let mut hash = WORKER_TASK_LIST.lock().unwrap(); + + hash.insert(task_id, worker.clone()); + super::set_worker_count(hash.len()); Ok(worker) } @@ -434,6 +437,7 @@ impl WorkerTask { WORKER_TASK_LIST.lock().unwrap().remove(&task_id); worker.log_result(result); let _ = update_active_workers(None); + super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); Ok(()) })); @@ -464,6 +468,7 @@ impl WorkerTask { worker.log_result(result); let _ = update_active_workers(None); p.send(()).unwrap(); + super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); }); tokio::spawn(c.then(|_| Ok(()))); diff --git a/src/tools.rs b/src/tools.rs index bbf8f647..fda9829d 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -627,6 +627,7 @@ static mut SHUTDOWN_REQUESTED: bool = false; pub fn request_shutdown() { unsafe { SHUTDOWN_REQUESTED = true; } + crate::server::server_shutdown(); } #[inline(always)]