From 1c564d5ef734423fee6544574ea7acf94b246604 Mon Sep 17 00:00:00 2001 From: Alexander Shishenko Date: Mon, 18 Nov 2024 23:21:48 +0300 Subject: [PATCH] Run pipeline for a defined contact of some type Check the types of contacts. Rewrite Slack API --- Cargo.toml | 4 +- notifico-apiserver/Cargo.toml | 1 + notifico-apiserver/src/amqp.rs | 64 ++++++++++++++++------- notifico-apiserver/src/http/mod.rs | 6 ++- notifico-apiserver/src/main.rs | 16 +++--- notifico-core/src/engine/mod.rs | 19 ++++++- notifico-core/src/error.rs | 14 ++--- notifico-core/src/pipeline/runner.rs | 12 ++++- notifico-core/src/recipient.rs | 14 +++-- notifico-subscription/src/lib.rs | 6 +-- notifico-worker/Cargo.toml | 1 + notifico-worker/src/main.rs | 4 +- transports/notifico-slack/Cargo.toml | 5 +- transports/notifico-slack/src/lib.rs | 28 ++++------ transports/notifico-slack/src/slackapi.rs | 63 ++++++++++++++++++++++ transports/notifico-smpp/src/lib.rs | 6 +-- transports/notifico-smtp/src/lib.rs | 6 +-- transports/notifico-telegram/src/lib.rs | 6 +-- transports/notifico-whatsapp/src/lib.rs | 6 +-- 19 files changed, 182 insertions(+), 99 deletions(-) create mode 100644 transports/notifico-slack/src/slackapi.rs diff --git a/Cargo.toml b/Cargo.toml index a8ddb55..dd3042c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ members = [ ] [workspace.dependencies] -sea-orm = { version = "1.1.0", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] } -reqwest = { version = "0.12.8", default-features = false, features = ["json", "default-tls", "charset"] } +sea-orm = { version = "1.1.1", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] } +reqwest = { version = "0.12.9", default-features = false, features = ["json", "default-tls", "charset"] } axum = { version = "0.7.7", features = ["macros"] } clap = { version = "4.5.20", features = ["derive", "color", "usage", "env"] } diff --git a/notifico-apiserver/Cargo.toml b/notifico-apiserver/Cargo.toml index 19a35ee..094d95f 100644 --- a/notifico-apiserver/Cargo.toml +++ b/notifico-apiserver/Cargo.toml @@ -18,6 +18,7 @@ clap = { version = "4.5.20", features = ["derive", "color", "usage", "env"] } dotenvy = "0.15.7" fe2o3-amqp = "0.13.1" futures = "0.3.31" +log = "0.4.22" rust-embed = { version = "8.5.0", features = ["mime-guess"] } sea-orm = { workspace = true } serde = { version = "1.0.210", features = ["derive"] } diff --git a/notifico-apiserver/src/amqp.rs b/notifico-apiserver/src/amqp.rs index d3b1614..27dd7c2 100644 --- a/notifico-apiserver/src/amqp.rs +++ b/notifico-apiserver/src/amqp.rs @@ -3,31 +3,57 @@ use backoff::ExponentialBackoff; use fe2o3_amqp::{Connection, Sender, Session}; use notifico_core::pipeline::runner::ProcessEventRequest; use tokio::sync::mpsc::Receiver; -use tracing::info; +use tracing::{error, info}; use url::Url; pub async fn run(amqp_url: Url, worker_addr: String, mut event_rx: Receiver) { - let mut connection = retry(ExponentialBackoff::default(), || async { - Ok(Connection::open("connection-1", amqp_url.clone()).await?) - }) - .await; + 'outer: loop { + info!("Connecting to AMQP broker: {amqp_url}..."); + let connection = retry(ExponentialBackoff::default(), || async { + Ok(Connection::open("connection-1", amqp_url.clone()).await?) + }) + .await; - let mut session = Session::begin(connection.as_mut().unwrap()).await.unwrap(); - - let mut sender = Sender::attach(&mut session, "rust-sender-link-1", worker_addr) - .await - .unwrap(); + let mut connection = match connection { + Ok(conn) => conn, + Err(err) => { + error!("Failed to connect to AMQP broker: {err:?}"); + continue; + } + }; - loop { - tokio::select! { - req = event_rx.recv() => { - info!("Sending message"); - let msg = serde_json::to_string(&req).unwrap(); - let _outcome = sender.send(msg).await.unwrap(); + let mut session = match Session::begin(&mut connection).await { + Ok(session) => session, + Err(err) => { + error!("Failed to create session: {err:?}"); + continue; } - _ = tokio::signal::ctrl_c() => { - info!("Shutting down"); - break; + }; + + let mut sender = + match Sender::attach(&mut session, "rust-sender-link-1", &worker_addr).await { + Ok(sender) => sender, + Err(err) => { + error!("Failed to create sender link: {err:?}"); + continue; + } + }; + + loop { + tokio::select! { + req = event_rx.recv() => { + info!("Sending AMQP message"); + let Some(req) = req else { + error!("Event receiver has been closed"); + break 'outer; + }; + let msg = serde_json::to_string(&req).unwrap(); + let _outcome = sender.send(msg).await.unwrap(); + } + _ = tokio::signal::ctrl_c() => { + info!("Shutting down AMQP session"); + break 'outer; + } } } } diff --git a/notifico-apiserver/src/http/mod.rs b/notifico-apiserver/src/http/mod.rs index 7221d68..b39e55a 100644 --- a/notifico-apiserver/src/http/mod.rs +++ b/notifico-apiserver/src/http/mod.rs @@ -45,6 +45,10 @@ pub(crate) async fn start( clientapi_bind: SocketAddr, ext: HttpExtensions, ) { + // Bind everything now to catch any errors before spinning up the coroutines + let service_listener = TcpListener::bind(serviceapi_bind).await.unwrap(); + let client_listener = TcpListener::bind(clientapi_bind).await.unwrap(); + // Service API let app = Router::new().nest("/api", ingest::get_router(ext.clone())); let app = app.nest("/api", admin::get_router(ext.clone())); @@ -55,14 +59,12 @@ pub(crate) async fn start( let app = app.merge(Redoc::with_url("/redoc", ApiDoc::openapi())); let app = app.fallback(static_handler); - let service_listener = TcpListener::bind(serviceapi_bind).await.unwrap(); tokio::spawn(async { axum::serve(service_listener, app).await.unwrap() }); // Client API let app = Router::new().nest("/api", recipient::get_router(ext.clone())); let app = app.layer(Extension(ext.secret_key.clone())); - let client_listener = TcpListener::bind(clientapi_bind).await.unwrap(); tokio::spawn(async { axum::serve(client_listener, app).await.unwrap() }); } diff --git a/notifico-apiserver/src/main.rs b/notifico-apiserver/src/main.rs index d3a4bb6..6e2c858 100644 --- a/notifico-apiserver/src/main.rs +++ b/notifico-apiserver/src/main.rs @@ -11,7 +11,7 @@ use notifico_template::db::DbTemplateSource; use sea_orm::{ConnectOptions, Database}; use std::net::SocketAddr; use std::sync::Arc; -use tracing::info; +use tracing::{info, log}; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use url::Url; @@ -54,7 +54,9 @@ async fn main() { info!("Config: {:#?}", args); - let db_conn_options = ConnectOptions::new(args.db_url.to_string()); + let mut db_conn_options = ConnectOptions::new(args.db_url.to_string()); + db_conn_options.sqlx_logging_level(log::LevelFilter::Debug); + let db_connection = Database::connect(db_conn_options).await.unwrap(); // Initializing plugins @@ -85,12 +87,10 @@ async fn main() { templates_controller: templates, }; - tokio::spawn(http::start( - args.service_api_bind, - args.client_api_bind, - ext, - )); - tokio::spawn(amqp::run(args.amqp, args.amqp_addr, request_rx)); + // Spawns HTTP servers and quits + http::start(args.service_api_bind, args.client_api_bind, ext).await; + let amqp_client = tokio::spawn(amqp::run(args.amqp, args.amqp_addr, request_rx)); + amqp_client.await.unwrap(); tokio::signal::ctrl_c().await.unwrap(); } diff --git a/notifico-core/src/engine/mod.rs b/notifico-core/src/engine/mod.rs index 0ebe216..981ddff 100644 --- a/notifico-core/src/engine/mod.rs +++ b/notifico-core/src/engine/mod.rs @@ -1,5 +1,5 @@ use crate::error::EngineError; -use crate::recipient::Recipient; +use crate::recipient::{Contact, Recipient, TypedContact}; use crate::step::SerializedStep; use crate::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; @@ -22,7 +22,8 @@ pub struct EventContext(pub Map); #[derive(Default, Debug, Serialize, Deserialize)] pub struct PipelineContext { pub project_id: Uuid, - pub recipient: Option, + pub recipient_info: Option, + pub current_contact: Option, pub event_name: String, pub event_context: EventContext, pub plugin_contexts: Map, @@ -30,6 +31,20 @@ pub struct PipelineContext { pub channel: String, } +impl PipelineContext { + pub fn get_contact(&self) -> Result { + let Some(contact) = &self.current_contact else { + return Err(EngineError::ContactNotSet); + }; + + if contact.r#type() != T::CONTACT_TYPE { + return Err(EngineError::ContactTypeMismatch(T::CONTACT_TYPE.to_owned())); + } + + contact.clone().into_contact() + } +} + /// Engine is used to run steps in the pipeline. /// Can be cloned and shared across tasks. #[derive(Clone)] diff --git a/notifico-core/src/error.rs b/notifico-core/src/error.rs index 731f287..2f20aad 100644 --- a/notifico-core/src/error.rs +++ b/notifico-core/src/error.rs @@ -1,21 +1,21 @@ use sea_orm::DbErr; use std::error::Error; -use uuid::Uuid; #[derive(Debug)] pub enum EngineError { InvalidCredentialFormat, CredentialNotFound, PluginNotFound(String), - ContactNotFound(String), - InvalidContactFormat, RecipientNotSet, - ProjectNotFound(Uuid), + ContactNotSet, + ContactTypeMismatch(String), + InvalidContactFormat(serde_json::Error), TemplateRenderingError, MissingTemplateParameter(String), InvalidRenderedTemplateFormat(Box), InternalError(Box), InvalidStep(serde_json::Error), + PartialSend(Box), } impl From for EngineError { @@ -23,9 +23,3 @@ impl From for EngineError { Self::InternalError(Box::new(value)) } } - -impl From> for EngineError { - fn from(value: Box) -> Self { - Self::InternalError(value) - } -} diff --git a/notifico-core/src/pipeline/runner.rs b/notifico-core/src/pipeline/runner.rs index 9c7e85d..a541505 100644 --- a/notifico-core/src/pipeline/runner.rs +++ b/notifico-core/src/pipeline/runner.rs @@ -81,15 +81,23 @@ impl PipelineRunner { let event_context = event_context.clone(); let event_name = event_name.to_string(); + let channel = pipeline.channel.clone(); + + let contact = recipient + .clone() + .map(|r| r.get_primary_contact(&channel)) + .unwrap_or_default(); + join_handles.spawn(async move { let context = PipelineContext { project_id, - recipient, + recipient_info: recipient, event_name, event_context, plugin_contexts: Default::default(), messages: Default::default(), - channel: pipeline.channel.clone(), + channel, + current_contact: contact, }; // Execute each step in the pipeline diff --git a/notifico-core/src/recipient.rs b/notifico-core/src/recipient.rs index 0a33511..a541815 100644 --- a/notifico-core/src/recipient.rs +++ b/notifico-core/src/recipient.rs @@ -13,13 +13,11 @@ pub struct Recipient { } impl Recipient { - pub fn get_primary_contact(&self) -> Result { - for contact in &self.contacts { - if contact.r#type() == T::CONTACT_TYPE { - return contact.clone().into_contact(); - } - } - Err(EngineError::ContactNotFound(T::CONTACT_TYPE.to_string())) + pub fn get_primary_contact(&self, channel: &str) -> Option { + self.contacts + .iter() + .find(|contact| contact.r#type() == channel) + .cloned() } } @@ -41,7 +39,7 @@ impl Contact { where T: TypedContact + for<'de> Deserialize<'de>, { - serde_json::from_value(self.0).map_err(|_| EngineError::InvalidContactFormat) + serde_json::from_value(self.0).map_err(EngineError::InvalidContactFormat) } } diff --git a/notifico-subscription/src/lib.rs b/notifico-subscription/src/lib.rs index e917ef4..ec163a3 100644 --- a/notifico-subscription/src/lib.rs +++ b/notifico-subscription/src/lib.rs @@ -150,7 +150,7 @@ impl EnginePlugin for SubscriptionManager { context: &mut PipelineContext, step: &SerializedStep, ) -> Result { - let Some(recipient) = &context.recipient else { + let Some(recipient) = &context.recipient_info else { return Err(EngineError::RecipientNotSet); }; @@ -173,10 +173,6 @@ impl EnginePlugin for SubscriptionManager { } } Step::ListUnsubscribe { .. } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - context.plugin_contexts.insert( EMAIL_LIST_UNSUBSCRIBE.into(), Value::String(format!( diff --git a/notifico-worker/Cargo.toml b/notifico-worker/Cargo.toml index 531c648..ea45ca3 100644 --- a/notifico-worker/Cargo.toml +++ b/notifico-worker/Cargo.toml @@ -30,6 +30,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] } url = "2.5.2" uuid = { version = "1.11.0", features = ["serde", "v7"] } +log = "0.4.22" [features] default = [] diff --git a/notifico-worker/src/main.rs b/notifico-worker/src/main.rs index 874ce76..676378f 100644 --- a/notifico-worker/src/main.rs +++ b/notifico-worker/src/main.rs @@ -83,7 +83,9 @@ async fn main() { .extract() .unwrap(); - let db_conn_options = ConnectOptions::new(args.db_url.to_string()); + let mut db_conn_options = ConnectOptions::new(args.db_url.to_string()); + db_conn_options.sqlx_logging_level(log::LevelFilter::Debug); + let db_connection = Database::connect(db_conn_options).await.unwrap(); let credentials = Arc::new(MemoryCredentialStorage::from_config(credential_config).unwrap()); diff --git a/transports/notifico-slack/Cargo.toml b/transports/notifico-slack/Cargo.toml index 57fcb0d..fc57719 100644 --- a/transports/notifico-slack/Cargo.toml +++ b/transports/notifico-slack/Cargo.toml @@ -5,7 +5,8 @@ edition = "2021" [dependencies] notifico-core = { path = "../../notifico-core" } -reqwest = "0.12.9" +reqwest = { workspace = true } serde = "1.0.215" async-trait = "0.1.83" -serde_json = "1.0.133" \ No newline at end of file +serde_json = "1.0.133" +thiserror = "2.0.3" diff --git a/transports/notifico-slack/src/lib.rs b/transports/notifico-slack/src/lib.rs index 0451772..28fe7e9 100644 --- a/transports/notifico-slack/src/lib.rs +++ b/transports/notifico-slack/src/lib.rs @@ -1,3 +1,4 @@ +mod slackapi; mod step; use crate::step::{Step, STEPS}; @@ -9,7 +10,6 @@ use notifico_core::recipient::TypedContact; use notifico_core::step::SerializedStep; use notifico_core::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; -use serde_json::json; use std::borrow::Cow; use std::sync::Arc; @@ -23,14 +23,14 @@ impl TypedCredential for SlackCredentials { } pub struct SlackPlugin { - client: reqwest::Client, + client: slackapi::SlackApi, credentials: Arc, } impl SlackPlugin { pub fn new(credentials: Arc) -> Self { SlackPlugin { - client: reqwest::Client::new(), + client: slackapi::SlackApi::new(), credentials, } } @@ -47,32 +47,24 @@ impl EnginePlugin for SlackPlugin { match step { Step::Send { credential } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - let credential: SlackCredentials = self .credentials .get_typed_credential(context.project_id, &credential) .await?; - let contact: SlackContact = recipient.get_primary_contact()?; + let contact: SlackContact = context.get_contact()?; for message in context.messages.iter().cloned() { let content: SlackMessage = message.try_into()?; - - let payload = json!({ - "channel": contact.channel_id, - "text": content.text, - }); + let slack_message = slackapi::SlackMessage::Text { + channel: contact.channel_id.clone(), + text: content.text, + }; self.client - .post("https://slack.com/api/chat.postMessage") - .header("Authorization", format!("Bearer {}", credential.token)) - .json(&payload) - .send() + .chat_post_message(&credential.token, slack_message) .await - .unwrap(); + .map_err(|e| EngineError::PartialSend(Box::new(e)))?; } Ok(StepOutput::Continue) } diff --git a/transports/notifico-slack/src/slackapi.rs b/transports/notifico-slack/src/slackapi.rs new file mode 100644 index 0000000..4612b89 --- /dev/null +++ b/transports/notifico-slack/src/slackapi.rs @@ -0,0 +1,63 @@ +use reqwest::header::AUTHORIZATION; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Deserialize, Debug)] +pub struct SlackStatusResponse { + ok: bool, + error: Option, +} + +impl SlackStatusResponse { + fn into_result(self) -> Result { + if !self.ok { + Err(SlackError::ApiError { + error: self.error.unwrap_or_default(), + }) + } else { + Ok(self) + } + } +} + +#[derive(Error, Debug)] +pub enum SlackError { + #[error("{0}")] + Request(#[from] reqwest::Error), + #[error("Slack API returned error: {error:?}")] + ApiError { error: String }, +} + +pub struct SlackApi { + client: reqwest::Client, +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum SlackMessage { + Text { channel: String, text: String }, +} + +impl SlackApi { + pub fn new() -> Self { + Self { + client: reqwest::Client::new(), + } + } + + pub async fn chat_post_message( + &self, + token: &str, + message: SlackMessage, + ) -> Result { + let resp = self + .client + .post("https://slack.com/api/chat.postMessage") + .header(AUTHORIZATION, String::from("Bearer ") + token) + .json(&message) + .send() + .await?; + + resp.json::().await?.into_result() + } +} diff --git a/transports/notifico-smpp/src/lib.rs b/transports/notifico-smpp/src/lib.rs index c97a28d..6f93acc 100644 --- a/transports/notifico-smpp/src/lib.rs +++ b/transports/notifico-smpp/src/lib.rs @@ -57,10 +57,6 @@ impl EnginePlugin for SmppPlugin { match step { Step::Send { credential } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - let credential: SmppServerCredentials = self .credentials .get_typed_credential(context.project_id, &credential) @@ -105,7 +101,7 @@ impl EnginePlugin for SmppPlugin { } } - let contact: MobilePhoneContact = recipient.get_primary_contact()?; + let contact: MobilePhoneContact = context.get_contact()?; for message in context.messages.iter().cloned() { let rendered: SmsContent = message.try_into().unwrap(); diff --git a/transports/notifico-smtp/src/lib.rs b/transports/notifico-smtp/src/lib.rs index 66c55e5..8db0df1 100644 --- a/transports/notifico-smtp/src/lib.rs +++ b/transports/notifico-smtp/src/lib.rs @@ -76,11 +76,7 @@ impl EnginePlugin for EmailPlugin { match step { Step::Send { credential } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - - let contact: EmailContact = recipient.get_primary_contact()?; + let contact: EmailContact = context.get_contact()?; let credential: SmtpServerCredentials = self .credentials diff --git a/transports/notifico-telegram/src/lib.rs b/transports/notifico-telegram/src/lib.rs index 8e8eb24..f78a3d7 100644 --- a/transports/notifico-telegram/src/lib.rs +++ b/transports/notifico-telegram/src/lib.rs @@ -50,16 +50,12 @@ impl EnginePlugin for TelegramPlugin { match step { Step::Send { credential } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - let credential: TelegramBotCredentials = self .credentials .get_typed_credential(context.project_id, &credential) .await?; let bot = Bot::new(credential.token); - let contact: TelegramContact = recipient.get_primary_contact()?; + let contact: TelegramContact = context.get_contact()?; for message in context.messages.iter().cloned() { let content: TelegramContent = message.try_into().unwrap(); diff --git a/transports/notifico-whatsapp/src/lib.rs b/transports/notifico-whatsapp/src/lib.rs index d39db10..1771679 100644 --- a/transports/notifico-whatsapp/src/lib.rs +++ b/transports/notifico-whatsapp/src/lib.rs @@ -46,11 +46,7 @@ impl EnginePlugin for WaBusinessPlugin { match step { Step::Send { credential } => { - let Some(recipient) = context.recipient.clone() else { - return Err(EngineError::RecipientNotSet); - }; - - let contact: MobilePhoneContact = recipient.get_primary_contact()?; + let contact: MobilePhoneContact = context.get_contact()?; // Send let credential: WhatsAppCredentials = self