diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml
new file mode 100644
index 0000000..d1e8e3b
--- /dev/null
+++ b/.idea/dataSources.xml
@@ -0,0 +1,26 @@
+
+
+
+
+ sqlite.xerial
+ true
+ org.sqlite.JDBC
+ jdbc:sqlite:$PROJECT_DIR$/db.sqlite3
+
+
+
+ $ProjectFileDir$
+
+
+ file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.45.1/org/xerial/sqlite-jdbc/3.45.1.0/sqlite-jdbc-3.45.1.0.jar
+
+
+ file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.45.1/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar
+
+
+ file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.40.1/sqlite-jdbc-3.40.1.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/notifico.iml b/.idea/notifico.iml
index ec9a74e..fbc7dba 100644
--- a/.idea/notifico.iml
+++ b/.idea/notifico.iml
@@ -11,6 +11,7 @@
+
diff --git a/Cargo.toml b/Cargo.toml
index d95f14a..4efbcac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,16 +9,17 @@ members = [
"notifico-template",
"notifico-worker",
"notifico-apiserver",
- "notifico-subscription"
+ "notifico-subscription",
+ "notifico-subscription/migration"
]
[workspace.dependencies]
-sea-orm = { version = "1.1.0-rc.3", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
+sea-orm = { version = "1.1.0", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
reqwest = { version = "0.12.8", default-features = false, features = ["json", "default-tls", "charset"] }
axum = "0.8.0-alpha.1"
[workspace.dependencies.sea-orm-migration]
-version = "1.1.0-rc.3"
+version = "1.1.0"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
diff --git a/notifico-apiserver/Cargo.toml b/notifico-apiserver/Cargo.toml
index 94f06e4..622a78d 100644
--- a/notifico-apiserver/Cargo.toml
+++ b/notifico-apiserver/Cargo.toml
@@ -16,6 +16,7 @@ figment = { version = "0.10.19", features = ["yaml", "env", "toml"] }
clap = { version = "4.5.20", features = ["derive", "color", "usage"] }
fe2o3-amqp = "0.13.1"
axum = { workspace = true }
+sea-orm = { workspace = true }
notifico-core = { path = "../notifico-core" }
notifico-subscription = { path = "../notifico-subscription" }
diff --git a/notifico-apiserver/src/amqp.rs b/notifico-apiserver/src/amqp.rs
index 91af356..f01c2d9 100644
--- a/notifico-apiserver/src/amqp.rs
+++ b/notifico-apiserver/src/amqp.rs
@@ -4,6 +4,7 @@ use notifico_core::pipeline::runner::ProcessEventRequest;
use tokio::sync::mpsc::Receiver;
use tracing::info;
+//TODO: reconnect to AMQP on failure
pub async fn run(config: Amqp, mut event_rx: Receiver) {
let mut connection = Connection::open("connection-1", config.connection_url())
.await
diff --git a/notifico-apiserver/src/http/ingest.rs b/notifico-apiserver/src/http/ingest.rs
index 5fea195..5b3b674 100644
--- a/notifico-apiserver/src/http/ingest.rs
+++ b/notifico-apiserver/src/http/ingest.rs
@@ -1,13 +1,14 @@
+use crate::http::HttpExtensions;
use axum::http::StatusCode;
use axum::routing::post;
use axum::{Extension, Json, Router};
use notifico_core::pipeline::runner::ProcessEventRequest;
use tokio::sync::mpsc::Sender;
-pub(crate) fn get_router(sender: Sender) -> Router {
+pub(crate) fn get_router(ext: HttpExtensions) -> Router {
Router::new()
.route("/v1/send", post(send))
- .layer(Extension(sender))
+ .layer(Extension(ext.sender))
}
async fn send(
diff --git a/notifico-apiserver/src/http/mod.rs b/notifico-apiserver/src/http/mod.rs
index ecad627..f733a06 100644
--- a/notifico-apiserver/src/http/mod.rs
+++ b/notifico-apiserver/src/http/mod.rs
@@ -1,12 +1,22 @@
mod ingest;
+mod recipient;
use axum::Router;
use notifico_core::pipeline::runner::ProcessEventRequest;
+use notifico_subscription::SubscriptionManager;
use std::net::SocketAddr;
+use std::sync::Arc;
use tokio::sync::mpsc::Sender;
-pub(crate) async fn start(bind: SocketAddr, sender: Sender) {
- let app = Router::new().nest("/api/ingest", ingest::get_router(sender));
+#[derive(Clone)]
+pub(crate) struct HttpExtensions {
+ pub sender: Sender,
+ pub subman: Arc,
+}
+
+pub(crate) async fn start(bind: SocketAddr, ext: HttpExtensions) {
+ let app = Router::new().nest("/api/ingest", ingest::get_router(ext.clone()));
+ let app = app.nest("/api/recipient", recipient::get_router(ext.clone()));
let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
axum::serve(listener, app).await.unwrap();
diff --git a/notifico-apiserver/src/http/recipient.rs b/notifico-apiserver/src/http/recipient.rs
new file mode 100644
index 0000000..72cc5d9
--- /dev/null
+++ b/notifico-apiserver/src/http/recipient.rs
@@ -0,0 +1,9 @@
+use crate::http::HttpExtensions;
+use axum::{Extension, Router};
+use notifico_subscription::http::get_router as subscription_get_router;
+
+pub(crate) fn get_router(ext: HttpExtensions) -> Router {
+ Router::new()
+ .nest("/", subscription_get_router(ext.subman.clone()))
+ .layer(Extension(ext.subman.clone()))
+}
diff --git a/notifico-apiserver/src/main.rs b/notifico-apiserver/src/main.rs
index 35e904d..05fbc7e 100644
--- a/notifico-apiserver/src/main.rs
+++ b/notifico-apiserver/src/main.rs
@@ -1,15 +1,18 @@
mod amqp;
mod http;
+use crate::http::HttpExtensions;
use clap::Parser;
-use fe2o3_amqp::{Connection, Sender, Session};
use figment::providers::Toml;
use figment::{
providers::{Env, Format},
Figment,
};
-use notifico_core::config::{Amqp, Config};
+use notifico_core::config::Config;
+use notifico_subscription::SubscriptionManager;
+use sea_orm::{ConnectOptions, Database};
use std::path::PathBuf;
+use std::sync::Arc;
use tracing::info;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
@@ -41,11 +44,25 @@ async fn main() {
info!("Config: {:#?}", config);
- // AMQP sender
+ let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
- let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(1);
+ let db_conn_options = ConnectOptions::new(config.db.url.to_string());
+ let db_connection = Database::connect(db_conn_options).await.unwrap();
- tokio::spawn(http::start(config.http.bind, request_tx));
+ // Initializing plugins
+ let subman = Arc::new(SubscriptionManager::new(
+ db_connection,
+ config.secret_key.as_bytes().to_vec(),
+ config.external_url,
+ ));
+ subman.setup().await.unwrap();
+
+ let ext = HttpExtensions {
+ sender: request_tx,
+ subman,
+ };
+
+ tokio::spawn(http::start(config.http.bind, ext));
tokio::spawn(amqp::run(config.amqp, request_rx));
tokio::signal::ctrl_c().await.unwrap();
diff --git a/notifico-core/src/config/mod.rs b/notifico-core/src/config/mod.rs
index 85927fe..2451de6 100644
--- a/notifico-core/src/config/mod.rs
+++ b/notifico-core/src/config/mod.rs
@@ -13,6 +13,10 @@ pub struct Config {
pub credentials: PathBuf,
pub pipelines: PathBuf,
pub amqp: Amqp,
+ pub db: Db,
+
+ pub secret_key: String,
+ pub external_url: Url,
}
#[derive(Debug, Deserialize)]
@@ -35,3 +39,8 @@ impl Amqp {
}
}
}
+
+#[derive(Debug, Deserialize)]
+pub struct Db {
+ pub url: Url,
+}
diff --git a/notifico-core/src/http/auth.rs b/notifico-core/src/http/auth.rs
index 84094b0..6886e0b 100644
--- a/notifico-core/src/http/auth.rs
+++ b/notifico-core/src/http/auth.rs
@@ -8,6 +8,7 @@ use axum::{http, Extension, Json};
use jsonwebtoken::{DecodingKey, Validation};
use serde::{Deserialize, Serialize};
use serde_json::json;
+use std::collections::BTreeSet;
use std::sync::Arc;
use uuid::Uuid;
@@ -16,18 +17,14 @@ pub struct AuthError {
status_code: StatusCode,
}
-#[derive(Clone, Deserialize, Serialize, Eq, PartialEq)]
-#[serde(rename = "snake_case")]
-pub enum Scopes {
- RecipientApi,
-}
+pub struct Scope(String);
#[derive(Clone, Deserialize, Serialize)]
pub struct Claims {
pub proj: Uuid, // Project ID
pub sub: Uuid, // Recipient ID
- pub scopes: Vec,
- pub exp: usize,
+ pub scopes: BTreeSet,
+ pub exp: u64,
}
impl IntoResponse for AuthError {
@@ -49,7 +46,7 @@ pub struct QueryParams {
pub async fn authorize(
Query(params): Query,
Extension(skey): Extension>,
- Extension(scope): Extension,
+ Extension(scope): Extension,
mut req: Request,
next: Next,
) -> Result, AuthError> {
@@ -100,7 +97,7 @@ pub async fn authorize(
};
// Check scopes
- if !token.claims.scopes.contains(&scope) {
+ if !token.claims.scopes.contains(&scope.0) {
return Err(AuthError {
message: "Insufficient scopes".to_string(),
status_code: StatusCode::FORBIDDEN,
diff --git a/notifico-subscription/Cargo.toml b/notifico-subscription/Cargo.toml
new file mode 100644
index 0000000..c1e06fd
--- /dev/null
+++ b/notifico-subscription/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "notifico-subscription"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+notifico-core = { path = "../notifico-core" }
+notifico-subcription-migration = { path = "migration" }
+
+url = "2.5.2"
+uuid = { version = "1.10.0", features = ["v7"] }
+serde_json = "1.0.128"
+sea-orm = { workspace = true }
+tracing = "0.1.40"
+serde = { version = "1.0.210", features = ["derive"] }
+axum = { workspace = true }
+jsonwebtoken = "9.3.0"
+anyhow = "1.0.91"
+
diff --git a/notifico-subscription/migration/Cargo.toml b/notifico-subscription/migration/Cargo.toml
new file mode 100644
index 0000000..3a2628f
--- /dev/null
+++ b/notifico-subscription/migration/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "notifico-subcription-migration"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+name = "migration"
+path = "src/lib.rs"
+
+[dependencies]
+async-std = { version = "1", features = ["attributes", "tokio1"] }
+sea-orm-migration = { workspace = true }
diff --git a/notifico-subscription/migration/README.md b/notifico-subscription/migration/README.md
new file mode 100644
index 0000000..3b438d8
--- /dev/null
+++ b/notifico-subscription/migration/README.md
@@ -0,0 +1,41 @@
+# Running Migrator CLI
+
+- Generate a new migration file
+ ```sh
+ cargo run -- generate MIGRATION_NAME
+ ```
+- Apply all pending migrations
+ ```sh
+ cargo run
+ ```
+ ```sh
+ cargo run -- up
+ ```
+- Apply first 10 pending migrations
+ ```sh
+ cargo run -- up -n 10
+ ```
+- Rollback last applied migrations
+ ```sh
+ cargo run -- down
+ ```
+- Rollback last 10 applied migrations
+ ```sh
+ cargo run -- down -n 10
+ ```
+- Drop all tables from the database, then reapply all migrations
+ ```sh
+ cargo run -- fresh
+ ```
+- Rollback all applied migrations, then reapply all migrations
+ ```sh
+ cargo run -- refresh
+ ```
+- Rollback all applied migrations
+ ```sh
+ cargo run -- reset
+ ```
+- Check the status of all migrations
+ ```sh
+ cargo run -- status
+ ```
diff --git a/notifico-subscription/migration/src/lib.rs b/notifico-subscription/migration/src/lib.rs
new file mode 100644
index 0000000..a1b78f8
--- /dev/null
+++ b/notifico-subscription/migration/src/lib.rs
@@ -0,0 +1,16 @@
+pub use sea_orm_migration::prelude::*;
+
+mod m20220101_000001_create_table;
+
+pub struct Migrator;
+
+#[async_trait::async_trait]
+impl MigratorTrait for Migrator {
+ fn migrations() -> Vec> {
+ vec![Box::new(m20220101_000001_create_table::Migration)]
+ }
+
+ fn migration_table_name() -> DynIden {
+ Alias::new("subscription_migrations").into_iden()
+ }
+}
diff --git a/notifico-subscription/migration/src/m20220101_000001_create_table.rs b/notifico-subscription/migration/src/m20220101_000001_create_table.rs
new file mode 100644
index 0000000..f01469a
--- /dev/null
+++ b/notifico-subscription/migration/src/m20220101_000001_create_table.rs
@@ -0,0 +1,55 @@
+use sea_orm_migration::{prelude::*, schema::*};
+
+#[derive(DeriveMigrationName)]
+pub struct Migration;
+
+#[async_trait::async_trait]
+impl MigrationTrait for Migration {
+ async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ manager
+ .create_table(
+ Table::create()
+ .table(Subscription::Table)
+ .if_not_exists()
+ .col(pk_uuid(Subscription::Id))
+ .col(uuid(Subscription::ProjectId))
+ .col(string(Subscription::Event))
+ .col(string(Subscription::Channel))
+ .col(uuid(Subscription::RecipientId))
+ .col(boolean(Subscription::IsSubscribed))
+ .to_owned(),
+ )
+ .await?;
+
+ manager
+ .create_index(
+ Index::create()
+ .name("idx_subscription_project_id")
+ .table(Subscription::Table)
+ .col(Subscription::ProjectId)
+ .col(Subscription::Event)
+ .col(Subscription::Channel)
+ .col(Subscription::RecipientId)
+ .unique()
+ .to_owned(),
+ )
+ .await
+ }
+
+ async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ manager
+ .drop_table(Table::drop().table(Subscription::Table).to_owned())
+ .await
+ }
+}
+
+#[derive(DeriveIden)]
+enum Subscription {
+ Table,
+ Id,
+ ProjectId,
+ Event,
+ Channel,
+ RecipientId,
+ IsSubscribed,
+}
diff --git a/notifico-subscription/migration/src/main.rs b/notifico-subscription/migration/src/main.rs
new file mode 100644
index 0000000..c6b6e48
--- /dev/null
+++ b/notifico-subscription/migration/src/main.rs
@@ -0,0 +1,6 @@
+use sea_orm_migration::prelude::*;
+
+#[async_std::main]
+async fn main() {
+ cli::run_cli(migration::Migrator).await;
+}
diff --git a/notifico-subscription/src/context.rs b/notifico-subscription/src/context.rs
new file mode 100644
index 0000000..4e9640d
--- /dev/null
+++ b/notifico-subscription/src/context.rs
@@ -0,0 +1 @@
+pub const EMAIL_LIST_UNSUBSCRIBE: &str = "email.list_unsubscribe";
diff --git a/notifico-subscription/src/entity/mod.rs b/notifico-subscription/src/entity/mod.rs
new file mode 100644
index 0000000..0d80e6b
--- /dev/null
+++ b/notifico-subscription/src/entity/mod.rs
@@ -0,0 +1,5 @@
+//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
+
+pub mod prelude;
+
+pub mod subscription;
diff --git a/notifico-subscription/src/entity/prelude.rs b/notifico-subscription/src/entity/prelude.rs
new file mode 100644
index 0000000..9e6cb55
--- /dev/null
+++ b/notifico-subscription/src/entity/prelude.rs
@@ -0,0 +1,3 @@
+//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
+
+pub use super::subscription::Entity as Subscription;
diff --git a/notifico-subscription/src/entity/subscription.rs b/notifico-subscription/src/entity/subscription.rs
new file mode 100644
index 0000000..6ba65fb
--- /dev/null
+++ b/notifico-subscription/src/entity/subscription.rs
@@ -0,0 +1,20 @@
+//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
+
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
+#[sea_orm(table_name = "subscription")]
+pub struct Model {
+ #[sea_orm(primary_key, auto_increment = false)]
+ pub id: Uuid,
+ pub project_id: Uuid,
+ pub event: String,
+ pub channel: String,
+ pub recipient_id: Uuid,
+ pub is_subscribed: bool,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {}
+
+impl ActiveModelBehavior for ActiveModel {}
diff --git a/notifico-subscription/src/http.rs b/notifico-subscription/src/http.rs
new file mode 100644
index 0000000..889ac98
--- /dev/null
+++ b/notifico-subscription/src/http.rs
@@ -0,0 +1,37 @@
+use crate::SubscriptionManager;
+use axum::extract::Query;
+use axum::routing::get;
+use axum::{Extension, Router};
+use notifico_core::http::AuthorizedRecipient;
+use serde::Deserialize;
+use std::sync::Arc;
+
+pub fn get_router(
+ ncenter: Arc,
+) -> Router {
+ Router::new()
+ .route("/v1/list-unsubscribe", get(list_unsubscribe))
+ .layer(Extension(ncenter))
+}
+
+#[derive(Debug, Deserialize)]
+pub struct QueryParams {
+ event: String,
+ channel: String,
+}
+
+pub(crate) async fn list_unsubscribe(
+ Query(params): Query,
+ Extension(sub_manager): Extension>,
+ Extension(auth): Extension>,
+) {
+ sub_manager
+ .set_subscribed(
+ auth.project_id,
+ auth.recipient_id,
+ ¶ms.event,
+ ¶ms.channel,
+ false,
+ )
+ .await;
+}
diff --git a/notifico-subscription/src/lib.rs b/notifico-subscription/src/lib.rs
new file mode 100644
index 0000000..b7b29f3
--- /dev/null
+++ b/notifico-subscription/src/lib.rs
@@ -0,0 +1,193 @@
+mod context;
+mod entity;
+pub mod http;
+mod step;
+
+use crate::context::EMAIL_LIST_UNSUBSCRIBE;
+use crate::entity::subscription;
+use crate::step::STEPS;
+use entity::prelude::*;
+use jsonwebtoken::{EncodingKey, Header};
+use migration::{Migrator, MigratorTrait};
+use notifico_core::http::auth::Claims;
+use notifico_core::step::SerializedStep;
+use notifico_core::{
+ engine::PipelineContext,
+ engine::{EnginePlugin, StepOutput},
+ error::EngineError,
+};
+use sea_orm::prelude::async_trait::async_trait;
+use sea_orm::sea_query::OnConflict;
+use sea_orm::ActiveValue::Set;
+use sea_orm::{ColumnTrait, EntityTrait};
+use sea_orm::{DatabaseConnection, EntityOrSelect, QueryFilter};
+use serde_json::Value;
+use std::borrow::Cow;
+use std::time::{SystemTime, UNIX_EPOCH};
+use step::Step;
+use tracing::error;
+use url::Url;
+use uuid::Uuid;
+
+pub struct SubscriptionManager {
+ db: DatabaseConnection,
+ secret_key: Vec,
+ subscriber_url: Url,
+}
+
+impl SubscriptionManager {
+ pub fn new(db: DatabaseConnection, secret_key: Vec, subscriber_url: Url) -> Self {
+ Self {
+ db,
+ secret_key,
+ subscriber_url,
+ }
+ }
+
+ pub async fn setup(&self) -> anyhow::Result<()> {
+ Ok(Migrator::up(&self.db, None).await?)
+ }
+
+ pub async fn set_subscribed(
+ &self,
+ project_id: Uuid,
+ recipient_id: Uuid,
+ event: &str,
+ channel: &str,
+ is_subscribed: bool,
+ ) {
+ let model = subscription::ActiveModel {
+ id: Set(Uuid::now_v7()),
+ project_id: Set(project_id),
+ event: Set(event.to_string()),
+ channel: Set(channel.to_string()),
+ recipient_id: Set(recipient_id),
+ is_subscribed: Set(is_subscribed),
+ };
+
+ subscription::Entity::insert(model)
+ .on_conflict(
+ OnConflict::columns([
+ subscription::Column::ProjectId,
+ subscription::Column::RecipientId,
+ subscription::Column::Event,
+ subscription::Column::Channel,
+ ])
+ .do_nothing()
+ .to_owned(),
+ )
+ .exec(&self.db)
+ .await
+ .unwrap();
+ }
+ pub async fn is_subscribed(
+ &self,
+ project_id: Uuid,
+ recipient_id: Uuid,
+ event: &str,
+ channel: &str,
+ ) -> bool {
+ let result = Subscription
+ .select()
+ .filter(subscription::Column::ProjectId.eq(project_id))
+ .filter(subscription::Column::RecipientId.eq(recipient_id))
+ .filter(subscription::Column::Event.eq(event))
+ .filter(subscription::Column::Channel.eq(channel))
+ .one(&self.db)
+ .await;
+ match result {
+ Ok(Some(subscription)) => subscription.is_subscribed,
+ Ok(None) => true,
+ Err(e) => {
+ error!("Error checking subscription: {}", e);
+ false
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl EnginePlugin for SubscriptionManager {
+ async fn execute_step(
+ &self,
+ context: &mut PipelineContext,
+ step: &SerializedStep,
+ ) -> Result {
+ let Some(recipient) = &context.recipient else {
+ return Err(EngineError::RecipientNotSet);
+ };
+
+ let step: Step = step.clone().convert_step()?;
+
+ match step {
+ Step::Check { channel } => {
+ if self
+ .is_subscribed(
+ context.project_id,
+ recipient.id,
+ &context.trigger_event,
+ &channel,
+ )
+ .await
+ {
+ Ok(StepOutput::Continue)
+ } else {
+ Ok(StepOutput::Interrupt)
+ }
+ }
+ Step::ListUnsubscribe { .. } => {
+ let Some(recipient) = context.recipient.clone() else {
+ return Err(EngineError::RecipientNotSet);
+ };
+
+ context.plugin_contexts.insert(
+ EMAIL_LIST_UNSUBSCRIBE.into(),
+ Value::String(format!(
+ "<{}>",
+ create_self_unsubscribe_url(
+ self.secret_key.clone(),
+ self.subscriber_url.clone(),
+ context.project_id,
+ &context.trigger_event,
+ recipient.id,
+ )
+ )),
+ );
+ Ok(StepOutput::Continue)
+ }
+ }
+ }
+
+ fn steps(&self) -> Vec> {
+ STEPS.iter().map(|&s| s.into()).collect()
+ }
+}
+
+// Implements one-click List-Unsubscribe style URL generation
+pub fn create_self_unsubscribe_url(
+ key: Vec,
+ subscriber_url: Url,
+ project_id: Uuid,
+ event: &str,
+ recipient_id: Uuid,
+) -> Url {
+ let claims = Claims {
+ proj: project_id,
+ sub: recipient_id,
+ scopes: [String::from("list_unsubscribe")].into_iter().collect(),
+ exp: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs() as _,
+ };
+
+ let token =
+ jsonwebtoken::encode(&Header::default(), &claims, &EncodingKey::from_secret(&key)).unwrap();
+
+ //TODO: Optimize URL creation to avoid unnecessary allocations and format machinery
+ let url = format!(
+ "{}/unsubscribe?token={}&event={}",
+ subscriber_url, token, event
+ );
+ Url::parse(&url).unwrap()
+}
diff --git a/notifico-subscription/src/step.rs b/notifico-subscription/src/step.rs
new file mode 100644
index 0000000..b95c312
--- /dev/null
+++ b/notifico-subscription/src/step.rs
@@ -0,0 +1,12 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize)]
+#[serde(tag = "step")]
+pub enum Step {
+ #[serde(rename = "sub.check")]
+ Check { channel: String },
+ #[serde(rename = "sub.list_unsubscribe")]
+ ListUnsubscribe,
+}
+
+pub(crate) const STEPS: &[&str] = &["sub.check", "sub.list_unsubscribe"];
diff --git a/notifico-worker/Cargo.toml b/notifico-worker/Cargo.toml
index b0849f7..00884cf 100644
--- a/notifico-worker/Cargo.toml
+++ b/notifico-worker/Cargo.toml
@@ -17,6 +17,7 @@ clap = { version = "4.5.20", features = ["derive", "color", "usage"] }
fe2o3-amqp = { version = "0.13.1", features = ["acceptor"] }
anyhow = "1.0.91"
url = "2.5.2"
+sea-orm = { workspace = true }
notifico-core = { path = "../notifico-core" }
notifico-telegram = { path = "../transports/notifico-telegram" }
diff --git a/notifico-worker/src/main.rs b/notifico-worker/src/main.rs
index 671b1f8..4b1a40b 100644
--- a/notifico-worker/src/main.rs
+++ b/notifico-worker/src/main.rs
@@ -14,9 +14,11 @@ use notifico_core::engine::Engine;
use notifico_core::pipeline::runner::PipelineRunner;
use notifico_smpp::SmppPlugin;
use notifico_smtp::EmailPlugin;
+use notifico_subscription::SubscriptionManager;
use notifico_telegram::TelegramPlugin;
use notifico_template::LocalTemplater;
use notifico_whatsapp::WaBusinessPlugin;
+use sea_orm::{ConnectOptions, Database};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
@@ -61,6 +63,9 @@ async fn main() {
.extract()
.unwrap();
+ let db_conn_options = ConnectOptions::new(config.db.url.to_string());
+ let db_connection = Database::connect(db_conn_options).await.unwrap();
+
let credentials = Arc::new(MemoryCredentialStorage::from_config(credential_config).unwrap());
let pipelines = Arc::new(MemoryPipelineStorage::from_config(&pipelines_config));
@@ -73,6 +78,16 @@ async fn main() {
engine.add_plugin(Arc::new(WaBusinessPlugin::new(credentials.clone())));
engine.add_plugin(Arc::new(SmppPlugin::new(credentials.clone())));
+ let subman = Arc::new(SubscriptionManager::new(
+ db_connection,
+ config.secret_key.as_bytes().to_vec(),
+ config.external_url,
+ ));
+ engine.add_plugin(subman.clone());
+
+ // Setup stateful plugins
+ subman.setup().await.unwrap();
+
// Create PipelineRunner, the core component of the Notifico system
let runner = Arc::new(PipelineRunner::new(pipelines.clone(), engine));
diff --git a/notifico.toml b/notifico.toml
index 45b455b..9384f74 100644
--- a/notifico.toml
+++ b/notifico.toml
@@ -1,6 +1,8 @@
templates = "/home/gamepad/notifico/templates"
pipelines = "pipelines.yml"
credentials = "credentials.toml"
+secret_key = "weak-secret-key"
+external_url = "http://127.0.0.1/"
[http]
bind = "[::]:3000"
@@ -8,4 +10,7 @@ bind = "[::]:3000"
[amqp]
#bind = "[::]:5432"
url = "amqp://guest:guest@127.0.0.1"
-address = "notifico1"
\ No newline at end of file
+address = "notifico1"
+
+[db]
+url = "sqlite://db.sqlite3"
\ No newline at end of file
diff --git a/regen_entities.sh b/regen_entities.sh
new file mode 100755
index 0000000..27e7d34
--- /dev/null
+++ b/regen_entities.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+set -xe
+
+pushd notifico-subscription
+TEMPDB=$(mktemp /tmp/seaorm-migrate-XXXXXXX.sqlite3)
+export DATABASE_URL="sqlite://$TEMPDB"
+sea-orm-cli migrate -d migration up
+sea-orm-cli generate entity -o src/entity --ignore-tables subscription_migrations
+rm "$TEMPDB"
+popd