Skip to content

Commit

Permalink
Run pipeline for a defined contact of some type
Browse files Browse the repository at this point in the history
Check the types of contacts.

Rewrite Slack API
  • Loading branch information
GamePad64 committed Nov 18, 2024
1 parent 47941ea commit 1c564d5
Show file tree
Hide file tree
Showing 19 changed files with 182 additions and 99 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ members = [
]

[workspace.dependencies]
sea-orm = { version = "1.1.0", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
reqwest = { version = "0.12.8", default-features = false, features = ["json", "default-tls", "charset"] }
sea-orm = { version = "1.1.1", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
reqwest = { version = "0.12.9", default-features = false, features = ["json", "default-tls", "charset"] }
axum = { version = "0.7.7", features = ["macros"] }
clap = { version = "4.5.20", features = ["derive", "color", "usage", "env"] }

Expand Down
1 change: 1 addition & 0 deletions notifico-apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap = { version = "4.5.20", features = ["derive", "color", "usage", "env"] }
dotenvy = "0.15.7"
fe2o3-amqp = "0.13.1"
futures = "0.3.31"
log = "0.4.22"
rust-embed = { version = "8.5.0", features = ["mime-guess"] }
sea-orm = { workspace = true }
serde = { version = "1.0.210", features = ["derive"] }
Expand Down
64 changes: 45 additions & 19 deletions notifico-apiserver/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,57 @@ use backoff::ExponentialBackoff;
use fe2o3_amqp::{Connection, Sender, Session};
use notifico_core::pipeline::runner::ProcessEventRequest;
use tokio::sync::mpsc::Receiver;
use tracing::info;
use tracing::{error, info};
use url::Url;

pub async fn run(amqp_url: Url, worker_addr: String, mut event_rx: Receiver<ProcessEventRequest>) {
let mut connection = retry(ExponentialBackoff::default(), || async {
Ok(Connection::open("connection-1", amqp_url.clone()).await?)
})
.await;
'outer: loop {
info!("Connecting to AMQP broker: {amqp_url}...");
let connection = retry(ExponentialBackoff::default(), || async {
Ok(Connection::open("connection-1", amqp_url.clone()).await?)
})
.await;

let mut session = Session::begin(connection.as_mut().unwrap()).await.unwrap();

let mut sender = Sender::attach(&mut session, "rust-sender-link-1", worker_addr)
.await
.unwrap();
let mut connection = match connection {
Ok(conn) => conn,
Err(err) => {
error!("Failed to connect to AMQP broker: {err:?}");
continue;
}
};

loop {
tokio::select! {
req = event_rx.recv() => {
info!("Sending message");
let msg = serde_json::to_string(&req).unwrap();
let _outcome = sender.send(msg).await.unwrap();
let mut session = match Session::begin(&mut connection).await {
Ok(session) => session,
Err(err) => {
error!("Failed to create session: {err:?}");
continue;
}
_ = tokio::signal::ctrl_c() => {
info!("Shutting down");
break;
};

let mut sender =
match Sender::attach(&mut session, "rust-sender-link-1", &worker_addr).await {
Ok(sender) => sender,
Err(err) => {
error!("Failed to create sender link: {err:?}");
continue;
}
};

loop {
tokio::select! {
req = event_rx.recv() => {
info!("Sending AMQP message");
let Some(req) = req else {
error!("Event receiver has been closed");
break 'outer;
};
let msg = serde_json::to_string(&req).unwrap();
let _outcome = sender.send(msg).await.unwrap();
}
_ = tokio::signal::ctrl_c() => {
info!("Shutting down AMQP session");
break 'outer;
}
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions notifico-apiserver/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub(crate) async fn start(
clientapi_bind: SocketAddr,
ext: HttpExtensions,
) {
// Bind everything now to catch any errors before spinning up the coroutines
let service_listener = TcpListener::bind(serviceapi_bind).await.unwrap();
let client_listener = TcpListener::bind(clientapi_bind).await.unwrap();

// Service API
let app = Router::new().nest("/api", ingest::get_router(ext.clone()));
let app = app.nest("/api", admin::get_router(ext.clone()));
Expand All @@ -55,14 +59,12 @@ pub(crate) async fn start(
let app = app.merge(Redoc::with_url("/redoc", ApiDoc::openapi()));
let app = app.fallback(static_handler);

let service_listener = TcpListener::bind(serviceapi_bind).await.unwrap();
tokio::spawn(async { axum::serve(service_listener, app).await.unwrap() });

// Client API
let app = Router::new().nest("/api", recipient::get_router(ext.clone()));
let app = app.layer(Extension(ext.secret_key.clone()));

let client_listener = TcpListener::bind(clientapi_bind).await.unwrap();
tokio::spawn(async { axum::serve(client_listener, app).await.unwrap() });
}

Expand Down
16 changes: 8 additions & 8 deletions notifico-apiserver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use notifico_template::db::DbTemplateSource;
use sea_orm::{ConnectOptions, Database};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::info;
use tracing::{info, log};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use url::Url;

Expand Down Expand Up @@ -54,7 +54,9 @@ async fn main() {

info!("Config: {:#?}", args);

let db_conn_options = ConnectOptions::new(args.db_url.to_string());
let mut db_conn_options = ConnectOptions::new(args.db_url.to_string());
db_conn_options.sqlx_logging_level(log::LevelFilter::Debug);

let db_connection = Database::connect(db_conn_options).await.unwrap();

// Initializing plugins
Expand Down Expand Up @@ -85,12 +87,10 @@ async fn main() {
templates_controller: templates,
};

tokio::spawn(http::start(
args.service_api_bind,
args.client_api_bind,
ext,
));
tokio::spawn(amqp::run(args.amqp, args.amqp_addr, request_rx));
// Spawns HTTP servers and quits
http::start(args.service_api_bind, args.client_api_bind, ext).await;
let amqp_client = tokio::spawn(amqp::run(args.amqp, args.amqp_addr, request_rx));
amqp_client.await.unwrap();

tokio::signal::ctrl_c().await.unwrap();
}
19 changes: 17 additions & 2 deletions notifico-core/src/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::EngineError;
use crate::recipient::Recipient;
use crate::recipient::{Contact, Recipient, TypedContact};
use crate::step::SerializedStep;
use crate::templater::RenderedTemplate;
use serde::{Deserialize, Serialize};
Expand All @@ -22,14 +22,29 @@ pub struct EventContext(pub Map<String, Value>);
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct PipelineContext {
pub project_id: Uuid,
pub recipient: Option<Recipient>,
pub recipient_info: Option<Recipient>,
pub current_contact: Option<Contact>,
pub event_name: String,
pub event_context: EventContext,
pub plugin_contexts: Map<String, Value>,
pub messages: Vec<RenderedTemplate>,
pub channel: String,
}

impl PipelineContext {
pub fn get_contact<T: TypedContact>(&self) -> Result<T, EngineError> {
let Some(contact) = &self.current_contact else {
return Err(EngineError::ContactNotSet);
};

if contact.r#type() != T::CONTACT_TYPE {
return Err(EngineError::ContactTypeMismatch(T::CONTACT_TYPE.to_owned()));
}

contact.clone().into_contact()
}
}

/// Engine is used to run steps in the pipeline.
/// Can be cloned and shared across tasks.
#[derive(Clone)]
Expand Down
14 changes: 4 additions & 10 deletions notifico-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
use sea_orm::DbErr;
use std::error::Error;
use uuid::Uuid;

#[derive(Debug)]
pub enum EngineError {
InvalidCredentialFormat,
CredentialNotFound,
PluginNotFound(String),
ContactNotFound(String),
InvalidContactFormat,
RecipientNotSet,
ProjectNotFound(Uuid),
ContactNotSet,
ContactTypeMismatch(String),
InvalidContactFormat(serde_json::Error),
TemplateRenderingError,
MissingTemplateParameter(String),
InvalidRenderedTemplateFormat(Box<dyn Error>),
InternalError(Box<dyn Error>),
InvalidStep(serde_json::Error),
PartialSend(Box<dyn Error>),
}

impl From<DbErr> for EngineError {
fn from(value: DbErr) -> Self {
Self::InternalError(Box::new(value))
}
}

impl From<Box<dyn Error>> for EngineError {
fn from(value: Box<dyn Error>) -> Self {
Self::InternalError(value)
}
}
12 changes: 10 additions & 2 deletions notifico-core/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,23 @@ impl PipelineRunner {
let event_context = event_context.clone();
let event_name = event_name.to_string();

let channel = pipeline.channel.clone();

let contact = recipient
.clone()
.map(|r| r.get_primary_contact(&channel))
.unwrap_or_default();

join_handles.spawn(async move {
let context = PipelineContext {
project_id,
recipient,
recipient_info: recipient,
event_name,
event_context,
plugin_contexts: Default::default(),
messages: Default::default(),
channel: pipeline.channel.clone(),
channel,
current_contact: contact,
};

// Execute each step in the pipeline
Expand Down
14 changes: 6 additions & 8 deletions notifico-core/src/recipient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ pub struct Recipient {
}

impl Recipient {
pub fn get_primary_contact<T: TypedContact>(&self) -> Result<T, EngineError> {
for contact in &self.contacts {
if contact.r#type() == T::CONTACT_TYPE {
return contact.clone().into_contact();
}
}
Err(EngineError::ContactNotFound(T::CONTACT_TYPE.to_string()))
pub fn get_primary_contact(&self, channel: &str) -> Option<Contact> {
self.contacts
.iter()
.find(|contact| contact.r#type() == channel)
.cloned()
}
}

Expand All @@ -41,7 +39,7 @@ impl Contact {
where
T: TypedContact + for<'de> Deserialize<'de>,
{
serde_json::from_value(self.0).map_err(|_| EngineError::InvalidContactFormat)
serde_json::from_value(self.0).map_err(EngineError::InvalidContactFormat)
}
}

Expand Down
6 changes: 1 addition & 5 deletions notifico-subscription/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl EnginePlugin for SubscriptionManager {
context: &mut PipelineContext,
step: &SerializedStep,
) -> Result<StepOutput, EngineError> {
let Some(recipient) = &context.recipient else {
let Some(recipient) = &context.recipient_info else {
return Err(EngineError::RecipientNotSet);
};

Expand All @@ -173,10 +173,6 @@ impl EnginePlugin for SubscriptionManager {
}
}
Step::ListUnsubscribe { .. } => {
let Some(recipient) = context.recipient.clone() else {
return Err(EngineError::RecipientNotSet);
};

context.plugin_contexts.insert(
EMAIL_LIST_UNSUBSCRIBE.into(),
Value::String(format!(
Expand Down
1 change: 1 addition & 0 deletions notifico-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] }
url = "2.5.2"
uuid = { version = "1.11.0", features = ["serde", "v7"] }
log = "0.4.22"

[features]
default = []
4 changes: 3 additions & 1 deletion notifico-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ async fn main() {
.extract()
.unwrap();

let db_conn_options = ConnectOptions::new(args.db_url.to_string());
let mut db_conn_options = ConnectOptions::new(args.db_url.to_string());
db_conn_options.sqlx_logging_level(log::LevelFilter::Debug);

let db_connection = Database::connect(db_conn_options).await.unwrap();

let credentials = Arc::new(MemoryCredentialStorage::from_config(credential_config).unwrap());
Expand Down
5 changes: 3 additions & 2 deletions transports/notifico-slack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ edition = "2021"

[dependencies]
notifico-core = { path = "../../notifico-core" }
reqwest = "0.12.9"
reqwest = { workspace = true }
serde = "1.0.215"
async-trait = "0.1.83"
serde_json = "1.0.133"
serde_json = "1.0.133"
thiserror = "2.0.3"
Loading

0 comments on commit 1c564d5

Please sign in to comment.