Skip to content

Commit

Permalink
Add subscription manager and list-unsubscribe generator
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Oct 27, 2024
1 parent 3656e9c commit 3abee6c
Show file tree
Hide file tree
Showing 28 changed files with 547 additions and 22 deletions.
26 changes: 26 additions & 0 deletions .idea/dataSources.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/notifico.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ members = [
"notifico-template",
"notifico-worker",
"notifico-apiserver",
"notifico-subscription"
"notifico-subscription",
"notifico-subscription/migration"
]

[workspace.dependencies]
sea-orm = { version = "1.1.0-rc.3", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
sea-orm = { version = "1.1.0", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-native-tls", "macros"] }
reqwest = { version = "0.12.8", default-features = false, features = ["json", "default-tls", "charset"] }
axum = "0.8.0-alpha.1"

[workspace.dependencies.sea-orm-migration]
version = "1.1.0-rc.3"
version = "1.1.0"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
Expand Down
1 change: 1 addition & 0 deletions notifico-apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ figment = { version = "0.10.19", features = ["yaml", "env", "toml"] }
clap = { version = "4.5.20", features = ["derive", "color", "usage"] }
fe2o3-amqp = "0.13.1"
axum = { workspace = true }
sea-orm = { workspace = true }

notifico-core = { path = "../notifico-core" }
notifico-subscription = { path = "../notifico-subscription" }
1 change: 1 addition & 0 deletions notifico-apiserver/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use notifico_core::pipeline::runner::ProcessEventRequest;
use tokio::sync::mpsc::Receiver;
use tracing::info;

//TODO: reconnect to AMQP on failure
pub async fn run(config: Amqp, mut event_rx: Receiver<ProcessEventRequest>) {
let mut connection = Connection::open("connection-1", config.connection_url())
.await
Expand Down
5 changes: 3 additions & 2 deletions notifico-apiserver/src/http/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::http::HttpExtensions;
use axum::http::StatusCode;
use axum::routing::post;
use axum::{Extension, Json, Router};
use notifico_core::pipeline::runner::ProcessEventRequest;
use tokio::sync::mpsc::Sender;

pub(crate) fn get_router(sender: Sender<ProcessEventRequest>) -> Router {
pub(crate) fn get_router(ext: HttpExtensions) -> Router {
Router::new()
.route("/v1/send", post(send))
.layer(Extension(sender))
.layer(Extension(ext.sender))
}

async fn send(
Expand Down
14 changes: 12 additions & 2 deletions notifico-apiserver/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
mod ingest;
mod recipient;

use axum::Router;
use notifico_core::pipeline::runner::ProcessEventRequest;
use notifico_subscription::SubscriptionManager;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

pub(crate) async fn start(bind: SocketAddr, sender: Sender<ProcessEventRequest>) {
let app = Router::new().nest("/api/ingest", ingest::get_router(sender));
#[derive(Clone)]
pub(crate) struct HttpExtensions {
pub sender: Sender<ProcessEventRequest>,
pub subman: Arc<SubscriptionManager>,
}

pub(crate) async fn start(bind: SocketAddr, ext: HttpExtensions) {
let app = Router::new().nest("/api/ingest", ingest::get_router(ext.clone()));
let app = app.nest("/api/recipient", recipient::get_router(ext.clone()));

let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
axum::serve(listener, app).await.unwrap();
Expand Down
9 changes: 9 additions & 0 deletions notifico-apiserver/src/http/recipient.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::http::HttpExtensions;
use axum::{Extension, Router};
use notifico_subscription::http::get_router as subscription_get_router;

pub(crate) fn get_router(ext: HttpExtensions) -> Router {
Router::new()
.nest("/", subscription_get_router(ext.subman.clone()))
.layer(Extension(ext.subman.clone()))
}
27 changes: 22 additions & 5 deletions notifico-apiserver/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
mod amqp;
mod http;

use crate::http::HttpExtensions;
use clap::Parser;
use fe2o3_amqp::{Connection, Sender, Session};
use figment::providers::Toml;
use figment::{
providers::{Env, Format},
Figment,
};
use notifico_core::config::{Amqp, Config};
use notifico_core::config::Config;
use notifico_subscription::SubscriptionManager;
use sea_orm::{ConnectOptions, Database};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

Expand Down Expand Up @@ -41,11 +44,25 @@ async fn main() {

info!("Config: {:#?}", config);

// AMQP sender
let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);

let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(1);
let db_conn_options = ConnectOptions::new(config.db.url.to_string());
let db_connection = Database::connect(db_conn_options).await.unwrap();

tokio::spawn(http::start(config.http.bind, request_tx));
// Initializing plugins
let subman = Arc::new(SubscriptionManager::new(
db_connection,
config.secret_key.as_bytes().to_vec(),
config.external_url,
));
subman.setup().await.unwrap();

let ext = HttpExtensions {
sender: request_tx,
subman,
};

tokio::spawn(http::start(config.http.bind, ext));
tokio::spawn(amqp::run(config.amqp, request_rx));

tokio::signal::ctrl_c().await.unwrap();
Expand Down
9 changes: 9 additions & 0 deletions notifico-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub struct Config {
pub credentials: PathBuf,
pub pipelines: PathBuf,
pub amqp: Amqp,
pub db: Db,

pub secret_key: String,
pub external_url: Url,
}

#[derive(Debug, Deserialize)]
Expand All @@ -35,3 +39,8 @@ impl Amqp {
}
}
}

#[derive(Debug, Deserialize)]
pub struct Db {
pub url: Url,
}
15 changes: 6 additions & 9 deletions notifico-core/src/http/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{http, Extension, Json};
use jsonwebtoken::{DecodingKey, Validation};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::BTreeSet;
use std::sync::Arc;
use uuid::Uuid;

Expand All @@ -16,18 +17,14 @@ pub struct AuthError {
status_code: StatusCode,
}

#[derive(Clone, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename = "snake_case")]
pub enum Scopes {
RecipientApi,
}
pub struct Scope(String);

#[derive(Clone, Deserialize, Serialize)]
pub struct Claims {
pub proj: Uuid, // Project ID
pub sub: Uuid, // Recipient ID
pub scopes: Vec<Scopes>,
pub exp: usize,
pub scopes: BTreeSet<String>,
pub exp: u64,
}

impl IntoResponse for AuthError {
Expand All @@ -49,7 +46,7 @@ pub struct QueryParams {
pub async fn authorize(
Query(params): Query<QueryParams>,
Extension(skey): Extension<Arc<SecretKey>>,
Extension(scope): Extension<Scopes>,
Extension(scope): Extension<Scope>,
mut req: Request,
next: Next,
) -> Result<Response<Body>, AuthError> {
Expand Down Expand Up @@ -100,7 +97,7 @@ pub async fn authorize(
};

// Check scopes
if !token.claims.scopes.contains(&scope) {
if !token.claims.scopes.contains(&scope.0) {
return Err(AuthError {
message: "Insufficient scopes".to_string(),
status_code: StatusCode::FORBIDDEN,
Expand Down
19 changes: 19 additions & 0 deletions notifico-subscription/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "notifico-subscription"
version = "0.1.0"
edition = "2021"

[dependencies]
notifico-core = { path = "../notifico-core" }
notifico-subcription-migration = { path = "migration" }

url = "2.5.2"
uuid = { version = "1.10.0", features = ["v7"] }
serde_json = "1.0.128"
sea-orm = { workspace = true }
tracing = "0.1.40"
serde = { version = "1.0.210", features = ["derive"] }
axum = { workspace = true }
jsonwebtoken = "9.3.0"
anyhow = "1.0.91"

13 changes: 13 additions & 0 deletions notifico-subscription/migration/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "notifico-subcription-migration"
version = "0.1.0"
edition = "2021"
publish = false

[lib]
name = "migration"
path = "src/lib.rs"

[dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
sea-orm-migration = { workspace = true }
41 changes: 41 additions & 0 deletions notifico-subscription/migration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Running Migrator CLI

- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```
16 changes: 16 additions & 0 deletions notifico-subscription/migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub use sea_orm_migration::prelude::*;

mod m20220101_000001_create_table;

pub struct Migrator;

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20220101_000001_create_table::Migration)]
}

fn migration_table_name() -> DynIden {
Alias::new("subscription_migrations").into_iden()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use sea_orm_migration::{prelude::*, schema::*};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Subscription::Table)
.if_not_exists()
.col(pk_uuid(Subscription::Id))
.col(uuid(Subscription::ProjectId))
.col(string(Subscription::Event))
.col(string(Subscription::Channel))
.col(uuid(Subscription::RecipientId))
.col(boolean(Subscription::IsSubscribed))
.to_owned(),
)
.await?;

manager
.create_index(
Index::create()
.name("idx_subscription_project_id")
.table(Subscription::Table)
.col(Subscription::ProjectId)
.col(Subscription::Event)
.col(Subscription::Channel)
.col(Subscription::RecipientId)
.unique()
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Subscription::Table).to_owned())
.await
}
}

#[derive(DeriveIden)]
enum Subscription {
Table,
Id,
ProjectId,
Event,
Channel,
RecipientId,
IsSubscribed,
}
6 changes: 6 additions & 0 deletions notifico-subscription/migration/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;

#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}
Loading

0 comments on commit 3abee6c

Please sign in to comment.