From 3b6ef63b6a94f21509ad9ac7ffa361f7fbd04239 Mon Sep 17 00:00:00 2001 From: Alexander Shishenko Date: Wed, 9 Oct 2024 21:01:28 +0300 Subject: [PATCH] Add basic notification center --- Cargo.toml | 47 +++++++--- notifico-core/src/credentials.rs | 2 + notifico-core/src/engine/mod.rs | 6 +- notifico-core/src/http.rs | 7 ++ notifico-core/src/lib.rs | 1 + notifico-core/src/pipeline.rs | 19 +++- notifico-ncenter/Cargo.toml | 17 ++++ notifico-ncenter/migration/Cargo.toml | 13 +++ notifico-ncenter/migration/README.md | 41 +++++++++ notifico-ncenter/migration/src/lib.rs | 16 ++++ .../src/m20220101_000001_create_table.rs | 50 +++++++++++ notifico-ncenter/migration/src/main.rs | 6 ++ notifico-ncenter/src/entity/mod.rs | 5 ++ .../src/entity/ncenter_notification.rs | 10 ++- notifico-ncenter/src/entity/prelude.rs | 3 + notifico-ncenter/src/http.rs | 49 ++++++++++ notifico-ncenter/src/lib.rs | 80 +++++++++++++++++ notifico-smtp/Cargo.toml | 3 +- notifico-smtp/src/lib.rs | 3 +- notifico-subscription/Cargo.toml | 3 +- notifico-subscription/migration/Cargo.toml | 14 +-- .../src/m20220101_000001_create_table.rs | 2 +- notifico-subscription/src/entity/mod.rs | 1 - notifico-subscription/src/entity/prelude.rs | 1 - notifico-subscription/src/http.rs | 37 ++++++++ notifico-subscription/src/lib.rs | 11 ++- notifico-subscription/src/step.rs | 2 +- notifico-telegram/Cargo.toml | 3 +- notifico-telegram/src/lib.rs | 14 +-- notifico-telegram/src/step.rs | 2 +- notifico-templater/Cargo.toml | 7 +- notifico-templater/src/lib.rs | 10 ++- notifico-whatsapp/Cargo.toml | 5 +- notifico-whatsapp/src/cloudapi.rs | 8 ++ notifico-whatsapp/src/lib.rs | 20 ++--- notifico-whatsapp/src/step.rs | 2 +- src/http/auth.rs | 90 +++++++++++++++++++ src/http/list_unsubscribe.rs | 68 -------------- src/http/mod.rs | 39 ++++---- src/main.rs | 8 +- 40 files changed, 564 insertions(+), 161 deletions(-) create mode 100644 notifico-core/src/http.rs create mode 100644 notifico-ncenter/Cargo.toml create mode 100644 notifico-ncenter/migration/Cargo.toml create mode 100644 notifico-ncenter/migration/README.md create mode 100644 notifico-ncenter/migration/src/lib.rs create mode 100644 notifico-ncenter/migration/src/m20220101_000001_create_table.rs create mode 100644 notifico-ncenter/migration/src/main.rs create mode 100644 notifico-ncenter/src/entity/mod.rs rename notifico-subscription/src/entity/subscription_migrations.rs => notifico-ncenter/src/entity/ncenter_notification.rs (61%) create mode 100644 notifico-ncenter/src/entity/prelude.rs create mode 100644 notifico-ncenter/src/http.rs create mode 100644 notifico-ncenter/src/lib.rs create mode 100644 notifico-subscription/src/http.rs create mode 100644 src/http/auth.rs delete mode 100644 src/http/list_unsubscribe.rs diff --git a/Cargo.toml b/Cargo.toml index 4d4c7f5..cf436ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ rust-version = "1.81.0" [dependencies] tokio = { version = "1.40", features = ["full"] } log = "0.4.22" -futures = "0.3.30" +futures = "0.3.31" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] } serde = { version = "1.0.210", features = ["derive"] } @@ -18,20 +18,22 @@ reqwest = { version = "0.12.7", features = ["json"] } async-trait = "0.1.82" url = "2.5.2" figment = { version = "0.10.19", features = ["yaml", "env"] } -clap = { version = "4.5.17", features = ["derive", "color", "usage"] } +clap = { version = "4.5.19", features = ["derive", "color", "usage"] } +actix = "0.13.5" +axum = { workspace = true } +jwt = "0.16.0" +jsonwebtoken = "9.3.0" +sha2 = "0.10.8" +hmac = "0.12.1" +anyhow = "1.0.89" + notifico-core = { path = "notifico-core" } notifico-telegram = { path = "notifico-telegram" } notifico-smtp = { path = "notifico-smtp" } notifico-subscription = { path = "notifico-subscription" } notifico-whatsapp = { path = "notifico-whatsapp" } notifico-templater = { path = "notifico-templater" } -serde_yaml = "0.9.23" -actix = "0.13.5" -axum = "0.7.6" -jwt = "0.16.0" -sha2 = "0.10.8" -hmac = "0.12.1" -anyhow = "1.0.89" +notifico-ncenter = { path = "notifico-ncenter" } [profile.release] opt-level = 3 @@ -39,8 +41,31 @@ lto = true codegen-units = 1 [workspace] -members = [".", "notifico-core", "notifico-smtp", "notifico-telegram", "notifico-subscription/migration", "notifico-subscription", "notifico-whatsapp", "notifico-templater"] +members = [ + "notifico-core", + "notifico-smtp", + "notifico-telegram", + "notifico-subscription", + "notifico-subscription/migration", + "notifico-whatsapp", + "notifico-templater", + "notifico-ncenter", + "notifico-ncenter/migration" +] [workspace.dependencies] -sea-orm = { version = "1.1.0-rc.1", features = ["sqlx-sqlite", "sqlx-postgres", "runtime-tokio-rustls", "macros"] } +sea-orm = { version = "1.1.0-rc.3", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-rustls", "macros"] } reqwest = "0.12.8" +axum = "0.8.0-alpha.1" + +[workspace.dependencies.sea-orm-migration] +version = "1.1.0-rc.3" +features = [ + # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. + # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. + # e.g. + "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature + "sqlx-sqlite", # `DATABASE_DRIVER` feature + "sqlx-postgres", # `DATABASE_DRIVER` feature + "sqlx-mysql", # `DATABASE_DRIVER` feature +] diff --git a/notifico-core/src/credentials.rs b/notifico-core/src/credentials.rs index 81ff88b..de270f3 100644 --- a/notifico-core/src/credentials.rs +++ b/notifico-core/src/credentials.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; +/// Generic credential with type information. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Credential { pub r#type: String, @@ -23,6 +24,7 @@ impl Credential { } } +/// Specific credential types should implement this trait. pub trait TypedCredential: for<'de> Deserialize<'de> { const CREDENTIAL_TYPE: &'static str; } diff --git a/notifico-core/src/engine/mod.rs b/notifico-core/src/engine/mod.rs index f3d3dde..488e8b4 100644 --- a/notifico-core/src/engine/mod.rs +++ b/notifico-core/src/engine/mod.rs @@ -1,4 +1,3 @@ -use crate::engine::plugin::{EnginePlugin, StepOutput}; use crate::error::EngineError; use crate::pipeline::SerializedStep; use crate::recipient::Recipient; @@ -12,7 +11,8 @@ use std::sync::Arc; use tracing::instrument; use uuid::Uuid; -pub mod plugin; +mod plugin; +pub use plugin::{EnginePlugin, StepOutput}; #[derive(Debug, Default, Clone, Serialize, Deserialize)] #[serde(transparent)] @@ -29,6 +29,8 @@ pub struct PipelineContext { pub channel: String, } +/// Engine is used to run steps in the pipeline. +/// Can be cloned and shared across tasks. #[derive(Clone)] pub struct Engine { steps: HashMap, Arc>, diff --git a/notifico-core/src/http.rs b/notifico-core/src/http.rs new file mode 100644 index 0000000..f7a7183 --- /dev/null +++ b/notifico-core/src/http.rs @@ -0,0 +1,7 @@ +use uuid::Uuid; + +#[derive(Clone)] +pub struct AuthorizedRecipient { + pub project_id: Uuid, + pub recipient_id: Uuid, +} diff --git a/notifico-core/src/lib.rs b/notifico-core/src/lib.rs index c3071d6..92f43b3 100644 --- a/notifico-core/src/lib.rs +++ b/notifico-core/src/lib.rs @@ -1,6 +1,7 @@ pub mod credentials; pub mod engine; pub mod error; +pub mod http; pub mod pipeline; pub mod recipient; pub mod templater; diff --git a/notifico-core/src/pipeline.rs b/notifico-core/src/pipeline.rs index f84df0f..aeb2ae9 100644 --- a/notifico-core/src/pipeline.rs +++ b/notifico-core/src/pipeline.rs @@ -1,4 +1,4 @@ -use crate::engine::plugin::StepOutput; +use crate::engine::StepOutput; use crate::engine::{Engine, EventContext, PipelineContext}; use crate::error::EngineError; use crate::recipient::{Recipient, RecipientDirectory}; @@ -73,6 +73,14 @@ impl PipelineRunner { } } + /// Processes an event by executing the associated pipelines. + /// + /// # Parameters + /// + /// * `project_id` - The unique identifier of the project associated with the event. + /// * `trigger_event` - The name of the event that triggered the pipeline execution. + /// * `event_context` - The contextual information related to the event. + /// * `recipient_sel` - An optional selector for the recipient of the event. pub async fn process_event( &self, project_id: Uuid, @@ -80,12 +88,12 @@ impl PipelineRunner { event_context: EventContext, recipient_sel: Option, ) -> Result<(), EngineError> { + // Fetch the pipelines associated with the project and event let pipelines = self .pipeline_storage .get_pipelines(project_id, trigger_event)?; - let mut join_handles = JoinSet::new(); - + // Determine the recipient based on the recipient selector let recipient = match recipient_sel { None => None, Some(RecipientSelector::RecipientId { id }) => { @@ -94,7 +102,8 @@ impl PipelineRunner { Some(RecipientSelector::Recipient(recipient)) => Some(recipient), }; - // Pipeline; + // Execute each pipeline in a separate task in parallel + let mut join_handles = JoinSet::new(); for pipeline in pipelines { let engine = self.engine.clone(); let recipient = recipient.clone(); @@ -112,6 +121,7 @@ impl PipelineRunner { channel: pipeline.channel, }; + // Execute each step in the pipeline for step in pipeline.steps.iter() { let result = engine.execute_step(&mut context, step).await; match result { @@ -126,6 +136,7 @@ impl PipelineRunner { }); } + // Wait for all pipelines to complete join_handles.join_all().await; Ok(()) } diff --git a/notifico-ncenter/Cargo.toml b/notifico-ncenter/Cargo.toml new file mode 100644 index 0000000..7bf6164 --- /dev/null +++ b/notifico-ncenter/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "notifico-ncenter" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = { workspace = true } +serde = { version = "1.0.210", features = ["derive"] } +sea-orm = { workspace = true } +async-trait = "0.1.83" +uuid = { version = "1.10.0", features = ["v7"] } +chrono = "0.4.38" +serde_json = "1.0.128" +anyhow = "1.0.89" + +notifico-core = { path = "../notifico-core" } +notifico-ncenter-migration = { path = "migration" } diff --git a/notifico-ncenter/migration/Cargo.toml b/notifico-ncenter/migration/Cargo.toml new file mode 100644 index 0000000..dc93a6f --- /dev/null +++ b/notifico-ncenter/migration/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "notifico-ncenter-migration" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "migration" +path = "src/lib.rs" + +[dependencies] +async-std = { version = "1", features = ["attributes", "tokio1"] } +sea-orm-migration = { workspace = true } diff --git a/notifico-ncenter/migration/README.md b/notifico-ncenter/migration/README.md new file mode 100644 index 0000000..3b438d8 --- /dev/null +++ b/notifico-ncenter/migration/README.md @@ -0,0 +1,41 @@ +# Running Migrator CLI + +- Generate a new migration file + ```sh + cargo run -- generate MIGRATION_NAME + ``` +- Apply all pending migrations + ```sh + cargo run + ``` + ```sh + cargo run -- up + ``` +- Apply first 10 pending migrations + ```sh + cargo run -- up -n 10 + ``` +- Rollback last applied migrations + ```sh + cargo run -- down + ``` +- Rollback last 10 applied migrations + ```sh + cargo run -- down -n 10 + ``` +- Drop all tables from the database, then reapply all migrations + ```sh + cargo run -- fresh + ``` +- Rollback all applied migrations, then reapply all migrations + ```sh + cargo run -- refresh + ``` +- Rollback all applied migrations + ```sh + cargo run -- reset + ``` +- Check the status of all migrations + ```sh + cargo run -- status + ``` diff --git a/notifico-ncenter/migration/src/lib.rs b/notifico-ncenter/migration/src/lib.rs new file mode 100644 index 0000000..06795c9 --- /dev/null +++ b/notifico-ncenter/migration/src/lib.rs @@ -0,0 +1,16 @@ +pub use sea_orm_migration::prelude::*; + +mod m20220101_000001_create_table; + +pub struct Migrator; + +#[async_trait::async_trait] +impl MigratorTrait for Migrator { + fn migrations() -> Vec> { + vec![Box::new(m20220101_000001_create_table::Migration)] + } + + fn migration_table_name() -> DynIden { + Alias::new("ncenter_migrations").into_iden() + } +} diff --git a/notifico-ncenter/migration/src/m20220101_000001_create_table.rs b/notifico-ncenter/migration/src/m20220101_000001_create_table.rs new file mode 100644 index 0000000..4c0811e --- /dev/null +++ b/notifico-ncenter/migration/src/m20220101_000001_create_table.rs @@ -0,0 +1,50 @@ +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(NcenterNotification::Table) + .if_not_exists() + .col(pk_uuid(NcenterNotification::Id)) + .col(uuid(NcenterNotification::RecipientId)) + .col(uuid(NcenterNotification::ProjectId)) + .col(json_binary(NcenterNotification::Content)) + .col(date_time(NcenterNotification::CreatedAt)) + .to_owned(), + ) + .await?; + manager + .create_index( + Index::create() + .name("idx_ncenter_notifications_recipient_id") + .table(NcenterNotification::Table) + .if_not_exists() + .col(NcenterNotification::RecipientId) + .col(NcenterNotification::ProjectId) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(NcenterNotification::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum NcenterNotification { + Table, + Id, + RecipientId, + ProjectId, + Content, + CreatedAt, +} diff --git a/notifico-ncenter/migration/src/main.rs b/notifico-ncenter/migration/src/main.rs new file mode 100644 index 0000000..c6b6e48 --- /dev/null +++ b/notifico-ncenter/migration/src/main.rs @@ -0,0 +1,6 @@ +use sea_orm_migration::prelude::*; + +#[async_std::main] +async fn main() { + cli::run_cli(migration::Migrator).await; +} diff --git a/notifico-ncenter/src/entity/mod.rs b/notifico-ncenter/src/entity/mod.rs new file mode 100644 index 0000000..d6eefa6 --- /dev/null +++ b/notifico-ncenter/src/entity/mod.rs @@ -0,0 +1,5 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.1 + +pub mod prelude; + +pub mod ncenter_notification; diff --git a/notifico-subscription/src/entity/subscription_migrations.rs b/notifico-ncenter/src/entity/ncenter_notification.rs similarity index 61% rename from notifico-subscription/src/entity/subscription_migrations.rs rename to notifico-ncenter/src/entity/ncenter_notification.rs index da6c61b..35e7e6e 100644 --- a/notifico-subscription/src/entity/subscription_migrations.rs +++ b/notifico-ncenter/src/entity/ncenter_notification.rs @@ -3,11 +3,15 @@ use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "subscription_migrations")] +#[sea_orm(table_name = "ncenter_notification")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] - pub version: String, - pub applied_at: i64, + pub id: Uuid, + pub recipient_id: Uuid, + pub project_id: Uuid, + #[sea_orm(column_type = "JsonBinary")] + pub content: Json, + pub created_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/notifico-ncenter/src/entity/prelude.rs b/notifico-ncenter/src/entity/prelude.rs new file mode 100644 index 0000000..ac88524 --- /dev/null +++ b/notifico-ncenter/src/entity/prelude.rs @@ -0,0 +1,3 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.1 + +pub use super::ncenter_notification::Entity as NcenterNotification; diff --git a/notifico-ncenter/src/http.rs b/notifico-ncenter/src/http.rs new file mode 100644 index 0000000..9f9d417 --- /dev/null +++ b/notifico-ncenter/src/http.rs @@ -0,0 +1,49 @@ +use crate::entity::ncenter_notification; +use crate::entity::prelude::NcenterNotification; +use crate::NCenterPlugin; +use axum::routing::get; +use axum::{Extension, Json, Router}; +use chrono::{TimeZone, Utc}; +use notifico_core::http::AuthorizedRecipient; +use sea_orm::EntityTrait; +use sea_orm::QueryFilter; +use sea_orm::{ColumnTrait, QueryOrder}; +use std::sync::Arc; +use uuid::Uuid; + +pub fn get_router(ncenter: Arc) -> Router { + Router::new() + .route("/notifications", get(notifications)) + .layer(Extension(ncenter)) +} + +#[derive(serde::Serialize)] +struct Notification { + id: Uuid, + content: serde_json::Value, + created_at: chrono::DateTime, +} + +async fn notifications( + Extension(recipient): Extension>, + Extension(ncenter): Extension>, +) -> Json> { + let notifications: Vec = NcenterNotification::find() + .filter(ncenter_notification::Column::RecipientId.eq(recipient.recipient_id)) + .filter(ncenter_notification::Column::ProjectId.eq(recipient.project_id)) + .order_by_desc(ncenter_notification::Column::CreatedAt) + .all(&ncenter.db) + .await + .unwrap(); + + Json( + notifications + .into_iter() + .map(|m| Notification { + id: m.id, + content: m.content, + created_at: Utc.from_utc_datetime(&m.created_at), + }) + .collect(), + ) +} diff --git a/notifico-ncenter/src/lib.rs b/notifico-ncenter/src/lib.rs new file mode 100644 index 0000000..cedd993 --- /dev/null +++ b/notifico-ncenter/src/lib.rs @@ -0,0 +1,80 @@ +mod entity; +pub mod http; + +use crate::entity::ncenter_notification; +use crate::entity::prelude::NcenterNotification; +use async_trait::async_trait; +use axum::Router; +use chrono::Utc; +use migration::{Migrator, MigratorTrait}; +use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput}; +use notifico_core::error::EngineError; +use notifico_core::pipeline::SerializedStep; +use sea_orm::ActiveValue::Set; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::borrow::Cow; +use std::sync::Arc; +use uuid::Uuid; + +pub struct NCenterPlugin { + pub(crate) db: DatabaseConnection, +} + +impl NCenterPlugin { + pub fn new(db: DatabaseConnection) -> Self { + Self { db } + } + + pub async fn setup(&self) -> Result<(), anyhow::Error> { + Migrator::up(&self.db, None).await?; + Ok(()) + } +} + +#[async_trait] +impl EnginePlugin for NCenterPlugin { + async fn execute_step( + &self, + context: &mut PipelineContext, + step: &SerializedStep, + ) -> Result { + let step: Step = step.clone().convert_step()?; + + match step { + Step::Send => { + let Some(recipient) = context.recipient.clone() else { + return Err(EngineError::RecipientNotSet); + }; + + for message in context.messages.iter() { + let model = ncenter_notification::ActiveModel { + id: Set(Uuid::now_v7()), + recipient_id: Set(recipient.id), + project_id: Set(context.project_id), + content: Set(serde_json::to_value(&message.0).unwrap()), + created_at: Set(Utc::now().naive_utc()), + }; + + NcenterNotification::insert(model) + .exec(&self.db) + .await + .unwrap(); + } + Ok(StepOutput::Continue) + } + } + } + + fn steps(&self) -> Vec> { + vec!["ncenter.send".into()] + } +} + +#[derive(Serialize, Deserialize)] +#[serde(tag = "step")] +enum Step { + #[serde(rename = "ncenter.send")] + Send, +} diff --git a/notifico-smtp/Cargo.toml b/notifico-smtp/Cargo.toml index d9a970d..ab5f144 100644 --- a/notifico-smtp/Cargo.toml +++ b/notifico-smtp/Cargo.toml @@ -9,6 +9,7 @@ serde_json = "1.0.128" tracing = "0.1.40" lettre = { version = "0.11.9", features = ["serde", "tokio1", "tokio1-native-tls", "tracing"] } serde = { version = "1.0.210", features = ["derive"] } -notifico-core = { path = "../notifico-core" } uuid = { version = "1.10.0", features = ["serde"] } url = "2.5.2" + +notifico-core = { path = "../notifico-core" } diff --git a/notifico-smtp/src/lib.rs b/notifico-smtp/src/lib.rs index 581ad0f..41f15ad 100644 --- a/notifico-smtp/src/lib.rs +++ b/notifico-smtp/src/lib.rs @@ -17,8 +17,7 @@ use lettre::{ use notifico_core::credentials::get_typed_credential; use notifico_core::{ credentials::Credentials, - engine::plugin::{EnginePlugin, StepOutput}, - engine::PipelineContext, + engine::{EnginePlugin, PipelineContext, StepOutput}, error::EngineError, pipeline::SerializedStep, recipient::TypedContact, diff --git a/notifico-subscription/Cargo.toml b/notifico-subscription/Cargo.toml index 397298d..2a1bc83 100644 --- a/notifico-subscription/Cargo.toml +++ b/notifico-subscription/Cargo.toml @@ -12,6 +12,7 @@ hmac = "0.12.1" serde_json = "1.0.128" sea-orm = { workspace = true } tracing = "0.1.40" -notifico-core = { path = "../notifico-core" } serde = { version = "1.0.210", features = ["derive"] } +axum = { workspace = true } +notifico-core = { path = "../notifico-core" } diff --git a/notifico-subscription/migration/Cargo.toml b/notifico-subscription/migration/Cargo.toml index cdaec2e..3a2628f 100644 --- a/notifico-subscription/migration/Cargo.toml +++ b/notifico-subscription/migration/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "migration" +name = "notifico-subcription-migration" version = "0.1.0" edition = "2021" publish = false @@ -10,14 +10,4 @@ path = "src/lib.rs" [dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } - -[dependencies.sea-orm-migration] -version = "1.1.0-rc.1" -features = [ - # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. - # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. - # e.g. - "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature - "sqlx-sqlite", # `DATABASE_DRIVER` feature - "sqlx-postgres", # `DATABASE_DRIVER` feature -] +sea-orm-migration = { workspace = true } diff --git a/notifico-subscription/migration/src/m20220101_000001_create_table.rs b/notifico-subscription/migration/src/m20220101_000001_create_table.rs index a6f89bd..f01469a 100644 --- a/notifico-subscription/migration/src/m20220101_000001_create_table.rs +++ b/notifico-subscription/migration/src/m20220101_000001_create_table.rs @@ -11,7 +11,7 @@ impl MigrationTrait for Migration { Table::create() .table(Subscription::Table) .if_not_exists() - .col(uuid(Subscription::Id).primary_key()) + .col(pk_uuid(Subscription::Id)) .col(uuid(Subscription::ProjectId)) .col(string(Subscription::Event)) .col(string(Subscription::Channel)) diff --git a/notifico-subscription/src/entity/mod.rs b/notifico-subscription/src/entity/mod.rs index 4c0c02c..f4d90b1 100644 --- a/notifico-subscription/src/entity/mod.rs +++ b/notifico-subscription/src/entity/mod.rs @@ -3,4 +3,3 @@ pub mod prelude; pub mod subscription; -pub mod subscription_migrations; diff --git a/notifico-subscription/src/entity/prelude.rs b/notifico-subscription/src/entity/prelude.rs index d06dc9b..2ac9e37 100644 --- a/notifico-subscription/src/entity/prelude.rs +++ b/notifico-subscription/src/entity/prelude.rs @@ -1,4 +1,3 @@ //! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.1 pub use super::subscription::Entity as Subscription; -pub use super::subscription_migrations::Entity as SubscriptionMigrations; diff --git a/notifico-subscription/src/http.rs b/notifico-subscription/src/http.rs new file mode 100644 index 0000000..c526fc7 --- /dev/null +++ b/notifico-subscription/src/http.rs @@ -0,0 +1,37 @@ +use crate::SubscriptionManager; +use axum::extract::{Query, State}; +use axum::routing::get; +use axum::{Extension, Router}; +use notifico_core::http::AuthorizedRecipient; +use serde::Deserialize; +use std::sync::Arc; + +pub fn get_router( + ncenter: Arc, +) -> Router { + Router::new() + .route("/list-unsubscribe", get(list_unsubscribe)) + .layer(Extension(ncenter)) +} + +#[derive(Debug, Deserialize)] +pub struct QueryParams { + event: String, + channel: String, +} + +pub(crate) async fn list_unsubscribe( + Query(params): Query, + Extension(sub_manager): Extension>, + Extension(auth): Extension>, +) { + sub_manager + .unsubscribe( + auth.project_id, + auth.recipient_id, + ¶ms.event, + ¶ms.channel, + false, + ) + .await; +} diff --git a/notifico-subscription/src/lib.rs b/notifico-subscription/src/lib.rs index c50c948..3984f9c 100644 --- a/notifico-subscription/src/lib.rs +++ b/notifico-subscription/src/lib.rs @@ -1,5 +1,6 @@ mod context; mod entity; +pub mod http; mod step; use crate::context::EMAIL_LIST_UNSUBSCRIBE; @@ -8,10 +9,12 @@ use crate::step::STEPS; use entity::prelude::*; use hmac::Hmac; use jwt::SignWithKey; -use notifico_core::engine::plugin::{EnginePlugin, StepOutput}; -use notifico_core::engine::PipelineContext; -use notifico_core::error::EngineError; -use notifico_core::pipeline::SerializedStep; +use notifico_core::{ + engine::PipelineContext, + engine::{EnginePlugin, StepOutput}, + error::EngineError, + pipeline::SerializedStep, +}; use sea_orm::prelude::async_trait::async_trait; use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; diff --git a/notifico-subscription/src/step.rs b/notifico-subscription/src/step.rs index a14a79b..b95c312 100644 --- a/notifico-subscription/src/step.rs +++ b/notifico-subscription/src/step.rs @@ -9,4 +9,4 @@ pub enum Step { ListUnsubscribe, } -pub(crate) const STEPS: &'static [&'static str] = &["sub.check", "sub.list_unsubscribe"]; +pub(crate) const STEPS: &[&str] = &["sub.check", "sub.list_unsubscribe"]; diff --git a/notifico-telegram/Cargo.toml b/notifico-telegram/Cargo.toml index fdcdd6f..acf7561 100644 --- a/notifico-telegram/Cargo.toml +++ b/notifico-telegram/Cargo.toml @@ -9,5 +9,6 @@ serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" tracing = "0.1.40" uuid = { version = "1.10.0", features = ["serde"] } -notifico-core = { path = "../notifico-core" } teloxide = "0.13.0" + +notifico-core = { path = "../notifico-core" } diff --git a/notifico-telegram/src/lib.rs b/notifico-telegram/src/lib.rs index 464477a..7173424 100644 --- a/notifico-telegram/src/lib.rs +++ b/notifico-telegram/src/lib.rs @@ -1,12 +1,14 @@ use crate::step::STEPS; use async_trait::async_trait; use contact::TelegramContact; -use notifico_core::credentials::{get_typed_credential, Credentials, TypedCredential}; -use notifico_core::engine::plugin::{EnginePlugin, StepOutput}; -use notifico_core::engine::PipelineContext; -use notifico_core::error::EngineError; -use notifico_core::pipeline::SerializedStep; -use notifico_core::templater::RenderResponse; +use notifico_core::{ + credentials::{get_typed_credential, Credentials, TypedCredential}, + engine::PipelineContext, + engine::{EnginePlugin, StepOutput}, + error::EngineError, + pipeline::SerializedStep, + templater::RenderResponse, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::borrow::Cow; diff --git a/notifico-telegram/src/step.rs b/notifico-telegram/src/step.rs index fc387c5..eceea99 100644 --- a/notifico-telegram/src/step.rs +++ b/notifico-telegram/src/step.rs @@ -7,4 +7,4 @@ pub enum Step { Send { credential: String }, } -pub(crate) const STEPS: &'static [&'static str] = &["telegram.load_template", "telegram.send"]; +pub(crate) const STEPS: &[&str] = &["telegram.send"]; diff --git a/notifico-templater/Cargo.toml b/notifico-templater/Cargo.toml index 9e71b11..77b9f5b 100644 --- a/notifico-templater/Cargo.toml +++ b/notifico-templater/Cargo.toml @@ -4,11 +4,12 @@ version = "0.1.0" edition = "2021" [dependencies] -notifico-core = { path = "../notifico-core" } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" -uuid = { version = "1.10.0", features = ["v4"] } +uuid = { version = "1.10.0" } reqwest-middleware = { version = "0.3.3", features = ["json"] } reqwest-tracing = "0.5.3" url = "2.5.2" -async-trait = "0.1.82" \ No newline at end of file +async-trait = "0.1.82" + +notifico-core = { path = "../notifico-core" } diff --git a/notifico-templater/src/lib.rs b/notifico-templater/src/lib.rs index d4962fd..da3bf3b 100644 --- a/notifico-templater/src/lib.rs +++ b/notifico-templater/src/lib.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; -use notifico_core::engine::plugin::{EnginePlugin, StepOutput}; -use notifico_core::engine::PipelineContext; -use notifico_core::error::EngineError; -use notifico_core::pipeline::SerializedStep; +use notifico_core::{ + engine::PipelineContext, + engine::{EnginePlugin, StepOutput}, + error::EngineError, + pipeline::SerializedStep, +}; use reqwest_middleware::reqwest::Client; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; diff --git a/notifico-whatsapp/Cargo.toml b/notifico-whatsapp/Cargo.toml index fc81abc..3c34785 100644 --- a/notifico-whatsapp/Cargo.toml +++ b/notifico-whatsapp/Cargo.toml @@ -5,9 +5,10 @@ edition = "2021" [dependencies] serde = { version = "1.0.210", features = ["derive"] } -notifico-core = { path = "../notifico-core" } -uuid = { version = "1.10.0", features = ["v4"] } +uuid = { version = "1.10.0" } serde_json = "1.0.128" async-trait = "0.1.82" reqwest = { workspace = true } tracing = "0.1.40" + +notifico-core = { path = "../notifico-core" } diff --git a/notifico-whatsapp/src/cloudapi.rs b/notifico-whatsapp/src/cloudapi.rs index 27903f7..1c5ed4a 100644 --- a/notifico-whatsapp/src/cloudapi.rs +++ b/notifico-whatsapp/src/cloudapi.rs @@ -11,6 +11,14 @@ pub struct Language { pub code: String, } +impl From<&str> for Language { + fn from(value: &str) -> Self { + Self { + code: value.to_string(), + } + } +} + #[derive(Serialize, Deserialize)] pub struct Message { pub messaging_product: MessagingProduct, diff --git a/notifico-whatsapp/src/lib.rs b/notifico-whatsapp/src/lib.rs index f8291f5..17d6813 100644 --- a/notifico-whatsapp/src/lib.rs +++ b/notifico-whatsapp/src/lib.rs @@ -2,13 +2,15 @@ use crate::cloudapi::{Language, MessageType, MessagingProduct}; use crate::credentials::WhatsAppCredentials; use crate::step::{Step, STEPS}; use async_trait::async_trait; -use notifico_core::credentials::{get_typed_credential, Credentials}; -use notifico_core::engine::plugin::{EnginePlugin, StepOutput}; -use notifico_core::engine::PipelineContext; -use notifico_core::error::EngineError; -use notifico_core::pipeline::SerializedStep; -use notifico_core::recipient::MobilePhoneContact; -use notifico_core::templater::RenderResponse; +use notifico_core::{ + credentials::{get_typed_credential, Credentials}, + engine::PipelineContext, + engine::{EnginePlugin, StepOutput}, + error::EngineError, + pipeline::SerializedStep, + recipient::MobilePhoneContact, + templater::RenderResponse, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::borrow::Cow; @@ -69,9 +71,7 @@ impl EnginePlugin for WaBusinessPlugin { let wamessage = cloudapi::Message { messaging_product: MessagingProduct::Whatsapp, to: contact.number.clone(), - language: Language { - code: "en_US".into(), - }, + language: "en_US".into(), message: MessageType::Text { preview_url: false, body: message.body, diff --git a/notifico-whatsapp/src/step.rs b/notifico-whatsapp/src/step.rs index 46078ce..08e4a69 100644 --- a/notifico-whatsapp/src/step.rs +++ b/notifico-whatsapp/src/step.rs @@ -7,4 +7,4 @@ pub enum Step { Send { credential: String }, } -pub const STEPS: &[&str] = &["whatsapp.load_template", "whatsapp.send"]; +pub const STEPS: &[&str] = &["whatsapp.send"]; diff --git a/src/http/auth.rs b/src/http/auth.rs new file mode 100644 index 0000000..9d7fc65 --- /dev/null +++ b/src/http/auth.rs @@ -0,0 +1,90 @@ +use crate::http::SecretKey; +use axum::body::Body; +use axum::extract::{Query, Request}; +use axum::http::StatusCode; +use axum::middleware::Next; +use axum::response::{IntoResponse, Response}; +use axum::{http, Extension, Json}; +use jwt::{Header, Token, VerifyWithKey}; +use notifico_core::http::AuthorizedRecipient; +use serde::Deserialize; +use serde_json::json; +use std::collections::BTreeMap; +use std::sync::Arc; +use uuid::Uuid; + +pub struct AuthError { + message: String, + status_code: StatusCode, +} + +impl IntoResponse for AuthError { + fn into_response(self) -> Response { + let body = Json(json!({ + "error": self.message, + })); + + (self.status_code, body).into_response() + } +} + +#[derive(Clone, Deserialize)] +pub struct QueryParams { + token: Option, +} + +/// Extracts the token from the query parameters or from Authorization header. +pub async fn authorize( + Query(params): Query, + Extension(skey): Extension>, + mut req: Request, + next: Next, +) -> Result, AuthError> { + let auth_header = req.headers_mut().get(http::header::AUTHORIZATION); + + let token = match (params.token, auth_header) { + (Some(query_token), _) => query_token.clone(), + (_, Some(auth_header)) => { + let value = auth_header.to_str().map_err(|_| AuthError { + message: "Empty header is not allowed".to_string(), + status_code: StatusCode::FORBIDDEN, + })?; + + let mut header = value.split_whitespace(); + let (_bearer, token) = (header.next(), header.next()); + + let Some(token) = token else { + return Err(AuthError { + message: "Missing bearer token".to_string(), + status_code: StatusCode::FORBIDDEN, + }); + }; + token.to_string() + } + (None, None) => { + return Err(AuthError { + message: "No JWT token provided".to_string(), + status_code: StatusCode::FORBIDDEN, + }) + } + }; + + let token: Token, _> = + token.verify_with_key(&skey.secret_key).unwrap(); + + let claims = token.claims(); + let (Some(recipient_id), Some(project_id)) = (claims.get("sub"), claims.get("proj")) else { + return Err(AuthError { + status_code: StatusCode::FORBIDDEN, + message: "Invalid JWT claims".to_string(), + }); + }; + + let recipient = AuthorizedRecipient { + project_id: project_id.parse::().unwrap(), + recipient_id: recipient_id.parse::().unwrap(), + }; + + req.extensions_mut().insert(Arc::new(recipient)); + Ok(next.run(req).await) +} diff --git a/src/http/list_unsubscribe.rs b/src/http/list_unsubscribe.rs deleted file mode 100644 index 0ebecd6..0000000 --- a/src/http/list_unsubscribe.rs +++ /dev/null @@ -1,68 +0,0 @@ -use crate::http::SharedState; -use anyhow::anyhow; -use axum::extract::{Query, State}; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; -use jwt::{Header, Token, VerifyWithKey}; -use serde::Deserialize; -use std::collections::BTreeMap; -use uuid::Uuid; - -pub(crate) struct JwtError(StatusCode, anyhow::Error); - -// Tell axum how to convert `AppError` into a response. -impl IntoResponse for JwtError { - fn into_response(self) -> Response { - (self.0, format!("Something went wrong: {}", self.1)).into_response() - } -} - -impl From for JwtError { - fn from(value: jwt::Error) -> Self { - Self(StatusCode::FORBIDDEN, value.into()) - } -} - -impl From for JwtError { - fn from(value: uuid::Error) -> Self { - Self(StatusCode::FORBIDDEN, value.into()) - } -} - -#[derive(Debug, Deserialize)] -pub struct QueryParams { - token: String, - event: String, - channel: String, -} - -pub(crate) async fn list_unsubscribe( - Query(params): Query, - State(state): State, -) -> Result<(), JwtError> { - let token: Token, _> = - params.token.verify_with_key(&state.secret_key)?; - - let claims = token.claims(); - let (Some(recipient_id), Some(project_id)) = (claims.get("sub"), claims.get("proj")) else { - return Err(JwtError( - StatusCode::FORBIDDEN, - anyhow!("Invalid JWT claims"), - )); - }; - - let recipient_id = recipient_id.parse::()?; - let project_id = project_id.parse::()?; - - state - .sub_manager - .unsubscribe( - project_id, - recipient_id, - ¶ms.event, - ¶ms.channel, - false, - ) - .await; - Ok(()) -} diff --git a/src/http/mod.rs b/src/http/mod.rs index 06eeaa7..a6b9a11 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,25 +1,27 @@ use crate::event_handler::{EventHandler, ProcessEvent}; -use crate::http::list_unsubscribe::list_unsubscribe; use actix::Addr; use axum::extract::State; -use axum::{ - http::StatusCode, - routing::{get, post}, - Json, Router, -}; +use axum::handler::Handler; +use axum::{http::StatusCode, middleware, routing::post, Extension, Json, Router}; use hmac::Hmac; +use notifico_ncenter::http::get_router as ncenter_router; +use notifico_ncenter::NCenterPlugin; +use notifico_subscription::http::get_router as subscription_router; use notifico_subscription::SubscriptionManager; use serde::Serialize; use sha2::Sha256; use std::sync::Arc; use uuid::Uuid; -mod list_unsubscribe; +mod auth; #[derive(Clone)] struct SharedState { event_handler: Addr, - sub_manager: Arc, +} + +#[derive(Clone)] +struct SecretKey { secret_key: Hmac, } @@ -27,29 +29,26 @@ pub(crate) async fn start( event_handler: Addr, sub_manager: Arc, secret_key: Hmac, + ncenter: Arc, ) { - let state = SharedState { - event_handler, - sub_manager, - secret_key, - }; + let state = SharedState { event_handler }; + + let extapi = Router::new() + .nest("/subscription", subscription_router(sub_manager)) + .nest("/ncenter", ncenter_router(ncenter)) + .layer(middleware::from_fn(auth::authorize)) + .layer(Extension(Arc::new(SecretKey { secret_key }))); // build our application with a route let app = Router::new() - .route("/", get(root)) .route("/trigger", post(trigger)) - .route("/unsubscribe", get(list_unsubscribe)) + .nest("/extapi", extapi) .with_state(state); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, app).await.unwrap(); } -// basic handler that responds with a static string -async fn root() -> &'static str { - "Hello, World!" -} - async fn trigger( State(state): State, Json(payload): Json, diff --git a/src/main.rs b/src/main.rs index 41872c6..f67dd01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use figment::{ }; use hmac::{Hmac, Mac}; use notifico_core::engine::Engine; +use notifico_ncenter::NCenterPlugin; use notifico_smtp::EmailPlugin; use notifico_subscription::SubscriptionManager; use notifico_telegram::TelegramPlugin; @@ -63,7 +64,7 @@ async fn main() { let db_connection = Database::connect(db_conn_options).await.unwrap(); let sub_manager = Arc::new(SubscriptionManager::new( - db_connection, + db_connection.clone(), secret_hmac.clone(), config.http.subscriber_url.clone(), )); @@ -74,12 +75,16 @@ async fn main() { config.projects[0].recipients.clone(), )); + let ncenter = Arc::new(NCenterPlugin::new(db_connection.clone())); + + // Create Engine with plugins let mut engine = Engine::new(); engine.add_plugin(Arc::new(TemplaterService::new("http://127.0.0.1:8000"))); engine.add_plugin(Arc::new(TelegramPlugin::new(credentials.clone()))); engine.add_plugin(Arc::new(EmailPlugin::new(credentials.clone()))); engine.add_plugin(sub_manager.clone()); engine.add_plugin(Arc::new(WaBusinessPlugin::new(credentials.clone()))); + engine.add_plugin(ncenter.clone()); let event_handler = EventHandler { pipeline_storage: pipelines.clone(), @@ -92,6 +97,7 @@ async fn main() { event_handler.clone(), sub_manager.clone(), secret_hmac, + ncenter.clone(), )); tokio::signal::ctrl_c().await.unwrap();