Skip to content

Commit

Permalink
Add basic notification center
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Oct 9, 2024
1 parent 31c0a07 commit 3b6ef63
Show file tree
Hide file tree
Showing 40 changed files with 564 additions and 161 deletions.
47 changes: 36 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ rust-version = "1.81.0"
[dependencies]
tokio = { version = "1.40", features = ["full"] }
log = "0.4.22"
futures = "0.3.30"
futures = "0.3.31"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] }
serde = { version = "1.0.210", features = ["derive"] }
Expand All @@ -18,29 +18,54 @@ reqwest = { version = "0.12.7", features = ["json"] }
async-trait = "0.1.82"
url = "2.5.2"
figment = { version = "0.10.19", features = ["yaml", "env"] }
clap = { version = "4.5.17", features = ["derive", "color", "usage"] }
clap = { version = "4.5.19", features = ["derive", "color", "usage"] }
actix = "0.13.5"
axum = { workspace = true }
jwt = "0.16.0"
jsonwebtoken = "9.3.0"
sha2 = "0.10.8"
hmac = "0.12.1"
anyhow = "1.0.89"

notifico-core = { path = "notifico-core" }
notifico-telegram = { path = "notifico-telegram" }
notifico-smtp = { path = "notifico-smtp" }
notifico-subscription = { path = "notifico-subscription" }
notifico-whatsapp = { path = "notifico-whatsapp" }
notifico-templater = { path = "notifico-templater" }
serde_yaml = "0.9.23"
actix = "0.13.5"
axum = "0.7.6"
jwt = "0.16.0"
sha2 = "0.10.8"
hmac = "0.12.1"
anyhow = "1.0.89"
notifico-ncenter = { path = "notifico-ncenter" }

[profile.release]
opt-level = 3
lto = true
codegen-units = 1

[workspace]
members = [".", "notifico-core", "notifico-smtp", "notifico-telegram", "notifico-subscription/migration", "notifico-subscription", "notifico-whatsapp", "notifico-templater"]
members = [
"notifico-core",
"notifico-smtp",
"notifico-telegram",
"notifico-subscription",
"notifico-subscription/migration",
"notifico-whatsapp",
"notifico-templater",
"notifico-ncenter",
"notifico-ncenter/migration"
]

[workspace.dependencies]
sea-orm = { version = "1.1.0-rc.1", features = ["sqlx-sqlite", "sqlx-postgres", "runtime-tokio-rustls", "macros"] }
sea-orm = { version = "1.1.0-rc.3", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio-rustls", "macros"] }
reqwest = "0.12.8"
axum = "0.8.0-alpha.1"

[workspace.dependencies.sea-orm-migration]
version = "1.1.0-rc.3"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
# e.g.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-sqlite", # `DATABASE_DRIVER` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature
"sqlx-mysql", # `DATABASE_DRIVER` feature
]
2 changes: 2 additions & 0 deletions notifico-core/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

/// Generic credential with type information.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Credential {
pub r#type: String,
Expand All @@ -23,6 +24,7 @@ impl Credential {
}
}

/// Specific credential types should implement this trait.
pub trait TypedCredential: for<'de> Deserialize<'de> {
const CREDENTIAL_TYPE: &'static str;
}
Expand Down
6 changes: 4 additions & 2 deletions notifico-core/src/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::engine::plugin::{EnginePlugin, StepOutput};
use crate::error::EngineError;
use crate::pipeline::SerializedStep;
use crate::recipient::Recipient;
Expand All @@ -12,7 +11,8 @@ use std::sync::Arc;
use tracing::instrument;
use uuid::Uuid;

pub mod plugin;
mod plugin;
pub use plugin::{EnginePlugin, StepOutput};

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(transparent)]
Expand All @@ -29,6 +29,8 @@ pub struct PipelineContext {
pub channel: String,
}

/// Engine is used to run steps in the pipeline.
/// Can be cloned and shared across tasks.
#[derive(Clone)]
pub struct Engine {
steps: HashMap<Cow<'static, str>, Arc<dyn EnginePlugin>>,
Expand Down
7 changes: 7 additions & 0 deletions notifico-core/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use uuid::Uuid;

#[derive(Clone)]
pub struct AuthorizedRecipient {
pub project_id: Uuid,
pub recipient_id: Uuid,
}
1 change: 1 addition & 0 deletions notifico-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod credentials;
pub mod engine;
pub mod error;
pub mod http;
pub mod pipeline;
pub mod recipient;
pub mod templater;
19 changes: 15 additions & 4 deletions notifico-core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::engine::plugin::StepOutput;
use crate::engine::StepOutput;
use crate::engine::{Engine, EventContext, PipelineContext};
use crate::error::EngineError;
use crate::recipient::{Recipient, RecipientDirectory};
Expand Down Expand Up @@ -73,19 +73,27 @@ impl PipelineRunner {
}
}

/// Processes an event by executing the associated pipelines.
///
/// # Parameters
///
/// * `project_id` - The unique identifier of the project associated with the event.
/// * `trigger_event` - The name of the event that triggered the pipeline execution.
/// * `event_context` - The contextual information related to the event.
/// * `recipient_sel` - An optional selector for the recipient of the event.
pub async fn process_event(
&self,
project_id: Uuid,
trigger_event: &str,
event_context: EventContext,
recipient_sel: Option<RecipientSelector>,
) -> Result<(), EngineError> {
// Fetch the pipelines associated with the project and event
let pipelines = self
.pipeline_storage
.get_pipelines(project_id, trigger_event)?;

let mut join_handles = JoinSet::new();

// Determine the recipient based on the recipient selector
let recipient = match recipient_sel {
None => None,
Some(RecipientSelector::RecipientId { id }) => {
Expand All @@ -94,7 +102,8 @@ impl PipelineRunner {
Some(RecipientSelector::Recipient(recipient)) => Some(recipient),
};

// Pipeline;
// Execute each pipeline in a separate task in parallel
let mut join_handles = JoinSet::new();
for pipeline in pipelines {
let engine = self.engine.clone();
let recipient = recipient.clone();
Expand All @@ -112,6 +121,7 @@ impl PipelineRunner {
channel: pipeline.channel,
};

// Execute each step in the pipeline
for step in pipeline.steps.iter() {
let result = engine.execute_step(&mut context, step).await;
match result {
Expand All @@ -126,6 +136,7 @@ impl PipelineRunner {
});
}

// Wait for all pipelines to complete
join_handles.join_all().await;
Ok(())
}
Expand Down
17 changes: 17 additions & 0 deletions notifico-ncenter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "notifico-ncenter"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { workspace = true }
serde = { version = "1.0.210", features = ["derive"] }
sea-orm = { workspace = true }
async-trait = "0.1.83"
uuid = { version = "1.10.0", features = ["v7"] }
chrono = "0.4.38"
serde_json = "1.0.128"
anyhow = "1.0.89"

notifico-core = { path = "../notifico-core" }
notifico-ncenter-migration = { path = "migration" }
13 changes: 13 additions & 0 deletions notifico-ncenter/migration/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "notifico-ncenter-migration"
version = "0.1.0"
edition = "2021"
publish = false

[lib]
name = "migration"
path = "src/lib.rs"

[dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
sea-orm-migration = { workspace = true }
41 changes: 41 additions & 0 deletions notifico-ncenter/migration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Running Migrator CLI

- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```
16 changes: 16 additions & 0 deletions notifico-ncenter/migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub use sea_orm_migration::prelude::*;

mod m20220101_000001_create_table;

pub struct Migrator;

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20220101_000001_create_table::Migration)]
}

fn migration_table_name() -> DynIden {
Alias::new("ncenter_migrations").into_iden()
}
}
50 changes: 50 additions & 0 deletions notifico-ncenter/migration/src/m20220101_000001_create_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use sea_orm_migration::{prelude::*, schema::*};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(NcenterNotification::Table)
.if_not_exists()
.col(pk_uuid(NcenterNotification::Id))
.col(uuid(NcenterNotification::RecipientId))
.col(uuid(NcenterNotification::ProjectId))
.col(json_binary(NcenterNotification::Content))
.col(date_time(NcenterNotification::CreatedAt))
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_ncenter_notifications_recipient_id")
.table(NcenterNotification::Table)
.if_not_exists()
.col(NcenterNotification::RecipientId)
.col(NcenterNotification::ProjectId)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(NcenterNotification::Table).to_owned())
.await
}
}

#[derive(DeriveIden)]
enum NcenterNotification {
Table,
Id,
RecipientId,
ProjectId,
Content,
CreatedAt,
}
6 changes: 6 additions & 0 deletions notifico-ncenter/migration/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;

#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}
5 changes: 5 additions & 0 deletions notifico-ncenter/src/entity/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.1
pub mod prelude;

pub mod ncenter_notification;
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "subscription_migrations")]
#[sea_orm(table_name = "ncenter_notification")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub version: String,
pub applied_at: i64,
pub id: Uuid,
pub recipient_id: Uuid,
pub project_id: Uuid,
#[sea_orm(column_type = "JsonBinary")]
pub content: Json,
pub created_at: DateTime,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
3 changes: 3 additions & 0 deletions notifico-ncenter/src/entity/prelude.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.1
pub use super::ncenter_notification::Entity as NcenterNotification;
Loading

0 comments on commit 3b6ef63

Please sign in to comment.