Skip to content

Commit

Permalink
feat(alerts): allow sending multiple emails per alert
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Aug 22, 2024
1 parent 9e9c95b commit 0600114
Showing 1 changed file with 150 additions and 92 deletions.
242 changes: 150 additions & 92 deletions crates/bestool/src/actions/tamanu/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@ const DEFAULT_SUBJECT_TEMPLATE: &str = "[Tamanu Alert] {{ filename }} ({{ hostna
///
/// ```yaml
/// enabled: true
/// recipients:
/// - alerts@tamanu.io
///
/// sql: |
/// SELECT * FROM fhir.jobs
/// WHERE error IS NOT NULL
/// AND created_at > $1
///
/// subject: "FHIR job errors ({{ hostname }})"
/// template: |
/// Automated alert! There have been {{ rows | length }} FHIR jobs
/// with errors in the past {{ interval }}. Here are the first 5:
/// {% for row in rows | first(5) %}
/// - {{ row.topic }}: {{ row.error }}
/// {% endfor %}
/// send:
/// - target: email:
/// addresses: [alerts@tamanu.io]
/// subject: "FHIR job errors ({{ hostname }})"
/// template: |
/// Automated alert! There have been {{ rows | length }} FHIR jobs
/// with errors in the past {{ interval }}. Here are the first 5:
/// {% for row in rows | first(5) %}
/// - {{ row.topic }}: {{ row.error }}
/// {% endfor %}
/// ```
///
/// # Template variables
Expand Down Expand Up @@ -131,10 +132,42 @@ struct AlertDefinition {
enabled: bool,
#[serde(skip)]
interval: Duration,
recipients: Vec<String>,
#[serde(default)]
send: Vec<SendTarget>,
sql: String,

// legacy email-only fields
#[serde(default)]
recipients: Vec<String>,
subject: Option<String>,
template: String,
template: Option<String>,
}

#[derive(serde::Deserialize, Debug)]
#[serde(rename_all = "kebab-case", tag = "target")]
enum SendTarget {
Email {
addresses: Vec<String>,
subject: Option<String>,
template: String,
},
}

impl AlertDefinition {
fn normalise(mut self) -> Self {
if !self.recipients.is_empty() {
self.send.push(SendTarget::Email {
addresses: self.recipients,
subject: self.subject,
template: self.template.unwrap_or_default(),
});
self.recipients = vec![];
self.subject = None;
self.template = None;
}

self
}
}

pub async fn run(ctx: Context<TamanuArgs, AlertsArgs>) -> Result<()> {
Expand Down Expand Up @@ -173,7 +206,7 @@ pub async fn run(ctx: Context<TamanuArgs, AlertsArgs>) -> Result<()> {

alert.file = file.to_path_buf();
alert.interval = ctx.args_sub.interval.into();
Some(alert)
Some(alert.normalise())
} else {
None
}
Expand Down Expand Up @@ -213,14 +246,9 @@ pub async fn run(ctx: Context<TamanuArgs, AlertsArgs>) -> Result<()> {

// TODO: convert to join!
for alert in alerts {
if let Err(err) = execute_alert(
&client,
&config.mailgun,
&alert,
ctx.args_sub.dry_run,
)
.await
.wrap_err(format!("while executing alert: {}", alert.file.display()))
if let Err(err) = execute_alert(&client, &config.mailgun, &alert, ctx.args_sub.dry_run)
.await
.wrap_err(format!("while executing alert: {}", alert.file.display()))
{
eprintln!("{err:?}");
}
Expand All @@ -229,19 +257,25 @@ pub async fn run(ctx: Context<TamanuArgs, AlertsArgs>) -> Result<()> {
Ok(())
}

#[instrument(skip(alert))]
fn load_templates(alert: &AlertDefinition) -> Result<Tera> {
#[instrument]
fn load_templates(target: &SendTarget) -> Result<Tera> {
let mut tera = tera::Tera::default();
tera.add_raw_template(
"subject",
alert.subject.as_deref().unwrap_or(DEFAULT_SUBJECT_TEMPLATE),
)
.into_diagnostic()
.wrap_err("compiling subject template")?;
tera.add_raw_template("alert.html", &alert.template)
.into_diagnostic()
.wrap_err("compiling email template")?;

match target {
SendTarget::Email {
subject, template, ..
} => {
tera.add_raw_template(
"subject",
subject.as_deref().unwrap_or(DEFAULT_SUBJECT_TEMPLATE),
)
.into_diagnostic()
.wrap_err("compiling subject template")?;
tera.add_raw_template("alert.html", &template)
.into_diagnostic()
.wrap_err("compiling email template")?;
}
}
Ok(tera)
}

Expand All @@ -255,7 +289,13 @@ fn build_context(

let mut context = TeraCtx::new();
context.insert("rows", &context_rows);
context.insert("interval", &format!("{}", Folktime::new(alert.interval).with_style(FolkStyle::OneUnitWhole)));
context.insert(
"interval",
&format!(
"{}",
Folktime::new(alert.interval).with_style(FolkStyle::OneUnitWhole)
),
);
context.insert(
"hostname",
System::host_name().as_deref().unwrap_or("unknown"),
Expand Down Expand Up @@ -295,8 +335,6 @@ async fn execute_alert(
) -> Result<()> {
info!(?alert.file, "executing alert");

let tera = load_templates(&alert)?;

let now = chrono::Utc::now();
let not_before = now - alert.interval;
info!(?now, ?not_before, interval=?alert.interval, "date range for alert");
Expand All @@ -318,38 +356,45 @@ async fn execute_alert(
info!(?alert.file, rows=%rows.len(), "alert triggered");

let mut context = build_context(alert, &rows, now);
let (subject, body) = render_alert(&tera, &mut context)?;

if dry_run {
println!("-------------------------------");
println!("Alert: {}", alert.file.display());
println!("Recipients: {}", alert.recipients.join(", "));
println!("Subject: {subject}");
println!("Body: {body}");
return Ok(());
}

debug!(?alert.recipients, "sending email");
let sender = EmailAddress::address(&mailgun.sender);
let message = Mailgun {
api_key: mailgun.api_key.clone(),
domain: mailgun.domain.clone(),
message: Message {
to: alert
.recipients
.iter()
.map(|email| EmailAddress::address(email))
.collect(),
subject,
html: body,
..Default::default()
},
};
message
.async_send(mailgun_rs::MailgunRegion::US, &sender)
.await
.into_diagnostic()
.wrap_err("sending email")?;
for target in &alert.send {
let tera = load_templates(target)?;
match target {
SendTarget::Email { addresses, .. } => {
let (subject, body) = render_alert(&tera, &mut context)?;

if dry_run {
println!("-------------------------------");
println!("Alert: {}", alert.file.display());
println!("Recipients: {}", addresses.join(", "));
println!("Subject: {subject}");
println!("Body: {body}");
return Ok(());
}

debug!(?alert.recipients, "sending email");
let sender = EmailAddress::address(&mailgun.sender);
let message = Mailgun {
api_key: mailgun.api_key.clone(),
domain: mailgun.domain.clone(),
message: Message {
to: addresses
.iter()
.map(|email| EmailAddress::address(email))
.collect(),
subject,
html: body,
..Default::default()
},
};
message
.async_send(mailgun_rs::MailgunRegion::US, &sender)
.await
.into_diagnostic()
.wrap_err("sending email")?;
}
}
}

Ok(())
}
Expand All @@ -358,18 +403,18 @@ async fn execute_alert(
struct Interval(pub Duration);

impl ToSql for Interval {
fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
out.put_i64(self.0.as_micros().try_into().unwrap_or_default());
out.put_i32(0);
out.put_i32(0);
Ok(IsNull::No)
}

fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::INTERVAL)
}

tokio_postgres::types::to_sql_checked!();
fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
out.put_i64(self.0.as_micros().try_into().unwrap_or_default());
out.put_i32(0);
out.put_i32(0);
Ok(IsNull::No)
}

fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::INTERVAL)
}

tokio_postgres::types::to_sql_checked!();
}

#[cfg(test)]
Expand All @@ -383,10 +428,11 @@ mod tests {
file: PathBuf::from("test.yaml"),
enabled: true,
interval: dur.to_std().unwrap(),
recipients: vec![],
sql: "".into(),
send: vec![],
recipients: vec![],
subject: None,
template: "".into(),
template: None,
};
let rows = vec![];
build_context(&alert, &rows, Utc::now())
Expand All @@ -405,36 +451,48 @@ mod tests {

#[test]
fn test_interval_format_hour() {
assert_eq!(
interval_context(Duration::hours(1)).as_deref(),
Some("1h"),
);
assert_eq!(interval_context(Duration::hours(1)).as_deref(), Some("1h"),);
}

#[test]
fn test_interval_format_day() {
assert_eq!(
interval_context(Duration::days(1)).as_deref(),
Some("1d"),
);
assert_eq!(interval_context(Duration::days(1)).as_deref(), Some("1d"),);
}

#[test]
fn test_alert_parse_without_interval() {
fn test_alert_parse() {
let alert = r#"
recipients:
- test@example.com
sql: |
SELECT $1::timestamptz;
send:
- target: email
addresses: [test@example.com]
subject: "[Tamanu Alert] Example ({{ hostname }})"
template: |
<p>Server: {{ hostname }}</p>
<p>There are {{ rows | length }} rows.</p>
"#;
let alert: AlertDefinition = serde_yml::from_str(&alert).unwrap();
let alert = alert.normalise();
assert_eq!(alert.interval, std::time::Duration::default());
assert!(matches!(alert.send[0], SendTarget::Email { .. }));
}

#[test]
fn test_alert_parse_legacy_recipients() {
let alert = r#"
sql: |
SELECT $1::timestamptz;
recipients:
- test@example.com
subject: "[Tamanu Alert] Example ({{ hostname }})"
template: |
<p>Server: {{ hostname }}</p>
<p>There are {{ rows | length }} rows.</p>
"#;
let alert: AlertDefinition = serde_yml::from_str(&alert).unwrap();
let alert = alert.normalise();
assert_eq!(alert.interval, std::time::Duration::default());
assert_eq!(alert.recipients, vec![String::from("test@example.com")]);
assert!(matches!(alert.send[0], SendTarget::Email { .. }));
}
}

0 comments on commit 0600114

Please sign in to comment.