notify: add notification groups

When notifying via a group, all endpoints contained in that group
will send out the notification.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
Lukas Wagner 2023-07-20 16:31:38 +02:00 committed by Wolfgang Bumiller
parent 055db2d107
commit ed5d27ba24
3 changed files with 192 additions and 32 deletions

View File

@ -2,6 +2,7 @@ use lazy_static::lazy_static;
use proxmox_schema::{ApiType, ObjectSchema}; use proxmox_schema::{ApiType, ObjectSchema};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin}; use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
use crate::group::{GroupConfig, GROUP_TYPENAME};
use crate::schema::BACKEND_NAME_SCHEMA; use crate::schema::BACKEND_NAME_SCHEMA;
use crate::Error; use crate::Error;
@ -36,6 +37,14 @@ fn config_init() -> SectionConfig {
)); ));
} }
const GROUP_SCHEMA: &ObjectSchema = GroupConfig::API_SCHEMA.unwrap_object_schema();
config.register_plugin(SectionConfigPlugin::new(
GROUP_TYPENAME.to_string(),
Some(String::from("name")),
GROUP_SCHEMA,
));
config config
} }

View File

@ -0,0 +1,41 @@
use crate::schema::ENTITY_NAME_SCHEMA;
use proxmox_schema::api_types::COMMENT_SCHEMA;
use proxmox_schema::{api, Updater};
use serde::{Deserialize, Serialize};
pub(crate) const GROUP_TYPENAME: &str = "group";
#[api(
properties: {
"endpoint": {
type: Array,
items: {
description: "Name of the included endpoint(s)",
type: String,
},
},
comment: {
optional: true,
schema: COMMENT_SCHEMA,
},
},
)]
#[derive(Debug, Serialize, Deserialize, Updater, Default)]
#[serde(rename_all = "kebab-case")]
/// Config for notification channels
pub struct GroupConfig {
/// Name of the channel
#[updater(skip)]
pub name: String,
/// Endpoints for this channel
pub endpoint: Vec<String>,
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DeleteableGroupProperty {
Comment,
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use group::{GroupConfig, GROUP_TYPENAME};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_section_config::SectionConfigData; use proxmox_section_config::SectionConfigData;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -12,6 +13,7 @@ use std::error::Error as StdError;
pub mod api; pub mod api;
mod config; mod config;
pub mod endpoints; pub mod endpoints;
pub mod group;
pub mod schema; pub mod schema;
#[derive(Debug)] #[derive(Debug)]
@ -20,6 +22,7 @@ pub enum Error {
ConfigDeserialization(Box<dyn StdError + Send + Sync>), ConfigDeserialization(Box<dyn StdError + Send + Sync>),
NotifyFailed(String, Box<dyn StdError + Send + Sync>), NotifyFailed(String, Box<dyn StdError + Send + Sync>),
TargetDoesNotExist(String), TargetDoesNotExist(String),
TargetTestFailed(Vec<Box<dyn StdError + Send + Sync + 'static>>),
} }
impl Display for Error { impl Display for Error {
@ -37,6 +40,13 @@ impl Display for Error {
Error::TargetDoesNotExist(target) => { Error::TargetDoesNotExist(target) => {
write!(f, "notification target '{target}' does not exist") write!(f, "notification target '{target}' does not exist")
} }
Error::TargetTestFailed(errs) => {
for err in errs {
writeln!(f, "{err}")?;
}
Ok(())
}
} }
} }
} }
@ -48,6 +58,7 @@ impl StdError for Error {
Error::ConfigDeserialization(err) => Some(&**err), Error::ConfigDeserialization(err) => Some(&**err),
Error::NotifyFailed(_, err) => Some(&**err), Error::NotifyFailed(_, err) => Some(&**err),
Error::TargetDoesNotExist(_) => None, Error::TargetDoesNotExist(_) => None,
Error::TargetTestFailed(errs) => Some(&*errs[0]),
} }
} }
} }
@ -131,6 +142,7 @@ impl Config {
#[derive(Default)] #[derive(Default)]
pub struct Bus { pub struct Bus {
endpoints: HashMap<String, Box<dyn Endpoint>>, endpoints: HashMap<String, Box<dyn Endpoint>>,
groups: HashMap<String, GroupConfig>,
} }
#[allow(unused_macros)] #[allow(unused_macros)]
@ -234,7 +246,15 @@ impl Bus {
); );
} }
Ok(Bus { endpoints }) let groups: HashMap<String, GroupConfig> = config
.config
.convert_to_typed_array(GROUP_TYPENAME)
.map_err(|err| Error::ConfigDeserialization(err.into()))?
.into_iter()
.map(|group: GroupConfig| (group.name.clone(), group))
.collect();
Ok(Bus { endpoints, groups })
} }
#[cfg(test)] #[cfg(test)]
@ -242,39 +262,76 @@ impl Bus {
self.endpoints.insert(endpoint.name().to_string(), endpoint); self.endpoints.insert(endpoint.name().to_string(), endpoint);
} }
pub fn send(&self, target: &str, notification: &Notification) -> Result<(), Error> { #[cfg(test)]
log::info!( pub fn add_group(&mut self, group: GroupConfig) {
"sending notification with title '{title}'", self.groups.insert(group.name.clone(), group);
title = notification.title
);
let endpoint = self
.endpoints
.get(target)
.ok_or(Error::TargetDoesNotExist(target.into()))?;
endpoint.send(notification).unwrap_or_else(|e| {
log::error!(
"could not notfiy via endpoint `{name}`: {e}",
name = endpoint.name()
)
});
Ok(())
} }
pub fn test_target(&self, target: &str) -> Result<(), Error> { /// Send a notification to a given target (endpoint or group).
let endpoint = self ///
.endpoints /// Any errors will not be returned but only logged.
.get(target) pub fn send(&self, endpoint_or_group: &str, notification: &Notification) {
.ok_or(Error::TargetDoesNotExist(target.into()))?; if let Some(group) = self.groups.get(endpoint_or_group) {
for endpoint in &group.endpoint {
self.send_via_single_endpoint(endpoint, notification);
}
} else {
self.send_via_single_endpoint(endpoint_or_group, notification);
}
}
endpoint.send(&Notification { fn send_via_single_endpoint(&self, endpoint: &str, notification: &Notification) {
if let Some(endpoint) = self.endpoints.get(endpoint) {
if let Err(e) = endpoint.send(notification) {
// Only log on errors, do not propagate fail to the caller.
log::error!(
"could not notify via target `{name}`: {e}",
name = endpoint.name()
);
} else {
log::info!("notified via endpoint `{name}`", name = endpoint.name());
}
} else {
log::error!("could not notify via endpoint '{endpoint}', it does not exist");
}
}
/// Send a test notification to a target (endpoint or group).
///
/// In contrast to the `send` function, this function will return
/// any errors to the caller.
pub fn test_target(&self, target: &str) -> Result<(), Error> {
let notification = Notification {
severity: Severity::Info, severity: Severity::Info,
title: "Test notification".into(), title: "Test notification".into(),
body: "This is a test of the notification target '{{ target }}'".into(), body: "This is a test of the notification target '{{ target }}'".into(),
properties: Some(json!({ "target": target })), properties: Some(json!({ "target": target })),
})?; };
let mut errors: Vec<Box<dyn StdError + Send + Sync>> = Vec::new();
let mut my_send = |target: &str| -> Result<(), Error> {
if let Some(endpoint) = self.endpoints.get(target) {
if let Err(e) = endpoint.send(&notification) {
errors.push(Box::new(e));
}
} else {
return Err(Error::TargetDoesNotExist(target.to_string()));
}
Ok(())
};
if let Some(group) = self.groups.get(target) {
for endpoint_name in &group.endpoint {
my_send(endpoint_name)?;
}
} else {
my_send(target)?;
}
if !errors.is_empty() {
return Err(Error::TargetTestFailed(errors));
}
Ok(()) Ok(())
} }
@ -288,6 +345,7 @@ mod tests {
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct MockEndpoint { struct MockEndpoint {
name: &'static str,
// Needs to be an Rc so that we can clone MockEndpoint before // Needs to be an Rc so that we can clone MockEndpoint before
// passing it to Bus, while still retaining a handle to the Vec // passing it to Bus, while still retaining a handle to the Vec
messages: Rc<RefCell<Vec<Notification>>>, messages: Rc<RefCell<Vec<Notification>>>,
@ -301,11 +359,18 @@ mod tests {
} }
fn name(&self) -> &str { fn name(&self) -> &str {
"mock-endpoint" self.name
} }
} }
impl MockEndpoint { impl MockEndpoint {
fn new(name: &'static str, filter: Option<String>) -> Self {
Self {
name,
..Default::default()
}
}
fn messages(&self) -> Vec<Notification> { fn messages(&self) -> Vec<Notification> {
self.messages.borrow().clone() self.messages.borrow().clone()
} }
@ -313,24 +378,69 @@ mod tests {
#[test] #[test]
fn test_add_mock_endpoint() -> Result<(), Error> { fn test_add_mock_endpoint() -> Result<(), Error> {
let mock = MockEndpoint::default(); let mock = MockEndpoint::new("endpoint", None);
let mut bus = Bus::default(); let mut bus = Bus::default();
bus.add_endpoint(Box::new(mock.clone())); bus.add_endpoint(Box::new(mock.clone()));
// Send directly to endpoint
bus.send( bus.send(
"mock-endpoint", "endpoint",
&Notification { &Notification {
title: "Title".into(), title: "Title".into(),
body: "Body".into(), body: "Body".into(),
severity: Severity::Info, severity: Severity::Info,
properties: Default::default(), properties: Default::default(),
}, },
)?; );
let messages = mock.messages(); let messages = mock.messages();
assert_eq!(messages.len(), 1); assert_eq!(messages.len(), 1);
Ok(()) Ok(())
} }
#[test]
fn test_groups() -> Result<(), Error> {
let endpoint1 = MockEndpoint::new("mock1", None);
let endpoint2 = MockEndpoint::new("mock2", None);
let mut bus = Bus::default();
bus.add_group(GroupConfig {
name: "group1".to_string(),
endpoint: vec!["mock1".into()],
comment: None,
});
bus.add_group(GroupConfig {
name: "group2".to_string(),
endpoint: vec!["mock2".into()],
comment: None,
});
bus.add_endpoint(Box::new(endpoint1.clone()));
bus.add_endpoint(Box::new(endpoint2.clone()));
let send_to_group = |channel| {
bus.send(
channel,
&Notification {
title: "Title".into(),
body: "Body".into(),
severity: Severity::Info,
properties: Default::default(),
},
)
};
send_to_group("group1");
assert_eq!(endpoint1.messages().len(), 1);
assert_eq!(endpoint2.messages().len(), 0);
send_to_group("group2");
assert_eq!(endpoint1.messages().len(), 1);
assert_eq!(endpoint2.messages().len(), 1);
Ok(())
}
} }