diff --git a/Cargo.toml b/Cargo.toml index e141f13..bc4d28d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,10 @@ members = [ "notifico-apiserver", "notifico-subscription", "notifico-subscription/migration", + "notifico-dbpipeline", + "notifico-dbpipeline/migration", + "notifico-project", + "notifico-project/migration", ] [workspace.dependencies] diff --git a/notifico-apiserver/Cargo.toml b/notifico-apiserver/Cargo.toml index c001e8e..a8980da 100644 --- a/notifico-apiserver/Cargo.toml +++ b/notifico-apiserver/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] notifico-core = { path = "../notifico-core" } notifico-subscription = { path = "../notifico-subscription" } +notifico-dbpipeline = { path = "../notifico-dbpipeline" } +notifico-project = { path = "../notifico-project" } anyhow = "1.0.93" async-trait = "0.1.83" diff --git a/notifico-apiserver/src/http/admin/event.rs b/notifico-apiserver/src/http/admin/event.rs new file mode 100644 index 0000000..b7fb973 --- /dev/null +++ b/notifico-apiserver/src/http/admin/event.rs @@ -0,0 +1,87 @@ +use axum::extract::{Path, Query}; +use axum::http::header::CONTENT_RANGE; +use axum::http::{HeaderMap, StatusCode}; +use axum::{Extension, Json}; +use notifico_core::http::admin::{ListQueryParams, PaginatedResult}; +use notifico_core::pipeline::storage::PipelineStorage; +use notifico_core::pipeline::Event; +use serde::Deserialize; +use serde_json::{json, Value}; +use std::sync::Arc; +use uuid::Uuid; + +pub async fn list_events( + Query(params): Query, + Extension(pipeline_storage): Extension>, +) -> (HeaderMap, Json>) { + let PaginatedResult { items, total_count } = + pipeline_storage.list_events(params).await.unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_RANGE, total_count.into()); + + (headers, Json(items)) +} + +pub async fn get_event( + Path((id,)): Path<(Uuid,)>, + Extension(pipeline_storage): Extension>, +) -> (StatusCode, Json>) { + let result = pipeline_storage.get_event_by_id(id).await.unwrap(); + + let Some(result) = result else { + return (StatusCode::NOT_FOUND, Json(None)); + }; + (StatusCode::OK, Json(Some(result))) +} + +#[derive(Deserialize)] +pub struct EventCreate { + project_id: Uuid, + name: String, +} + +pub async fn create_event( + Extension(pipeline_storage): Extension>, + Json(create): Json, +) -> (StatusCode, Json) { + let result = pipeline_storage + .create_event(create.project_id, &create.name) + .await + .unwrap(); + + ( + StatusCode::CREATED, + Json(serde_json::to_value(result).unwrap()), + ) +} + +#[derive(Deserialize)] +pub struct EventUpdate { + name: String, +} + +pub async fn update_event( + Extension(pipeline_storage): Extension>, + Path((id,)): Path<(Uuid,)>, + Json(update): Json, +) -> (StatusCode, Json) { + let result = pipeline_storage + .update_event(id, &update.name) + .await + .unwrap(); + + ( + StatusCode::CREATED, + Json(serde_json::to_value(result).unwrap()), + ) +} + +pub async fn delete_event( + Extension(pipeline_storage): Extension>, + Path((id,)): Path<(Uuid,)>, +) -> (StatusCode, Json) { + pipeline_storage.delete_event(id).await.unwrap(); + + (StatusCode::NO_CONTENT, Json(json!({}))) +} diff --git a/notifico-apiserver/src/http/admin/mod.rs b/notifico-apiserver/src/http/admin/mod.rs index 7699d10..70a2492 100644 --- a/notifico-apiserver/src/http/admin/mod.rs +++ b/notifico-apiserver/src/http/admin/mod.rs @@ -2,7 +2,9 @@ use crate::http::HttpExtensions; use axum::routing::{get, put}; use axum::{Extension, Router}; use tower_http::cors::CorsLayer; - +mod event; +mod pipeline; +mod project; pub mod subscription; pub(crate) fn get_router(ext: HttpExtensions) -> Router { @@ -14,7 +16,34 @@ pub(crate) fn get_router(ext: HttpExtensions) -> Router { "/v1/subscriptions/:id", put(subscription::update_subscription), ) + // Pipelines + .route("/v1/pipelines", get(pipeline::list_pipelines)) + .route("/v1/pipelines/:id", get(pipeline::get_pipeline)) + // Events + .route( + "/v1/events", + get(event::list_events).post(event::create_event), + ) + .route( + "/v1/events/:id", + get(event::get_event) + .put(event::update_event) + .delete(event::delete_event), + ) + // Projects + .route( + "/v1/projects", + get(project::list_projects).post(project::create_project), + ) + .route( + "/v1/projects/:id", + get(project::get_project) + .put(project::update_project) + .delete(project::delete_project), + ) // Layers .layer(Extension(ext.subman)) + .layer(Extension(ext.pipeline_storage)) + .layer(Extension(ext.projects_controller)) .layer(CorsLayer::permissive()) } diff --git a/notifico-apiserver/src/http/admin/pipeline.rs b/notifico-apiserver/src/http/admin/pipeline.rs new file mode 100644 index 0000000..0857165 --- /dev/null +++ b/notifico-apiserver/src/http/admin/pipeline.rs @@ -0,0 +1,78 @@ +use crate::http::admin::project::ProjectUpdate; +use axum::extract::{Path, Query}; +use axum::http::header::CONTENT_RANGE; +use axum::http::{HeaderMap, StatusCode}; +use axum::{Extension, Json}; +use notifico_core::http::admin::{ListQueryParams, PaginatedResult}; +use notifico_core::pipeline::storage::{PipelineResult, PipelineStorage}; +use notifico_core::step::SerializedStep; +use serde::Serialize; +use serde_json::Value; +use std::sync::Arc; +use uuid::Uuid; + +#[derive(Clone, Serialize)] +pub struct PipelineItem { + pub id: Uuid, + pub project_id: Uuid, + pub event_ids: Vec, + pub steps: Vec, + pub channel: String, +} + +impl From for PipelineItem { + fn from(value: PipelineResult) -> Self { + Self { + id: value.pipeline.id, + project_id: value.pipeline.project_id, + steps: value.pipeline.steps.clone(), + channel: value.pipeline.channel, + + event_ids: value.event_ids, + } + } +} + +pub async fn list_pipelines( + Query(params): Query, + Extension(pipeline_storage): Extension>, +) -> (HeaderMap, Json>) { + let PaginatedResult { items, total_count } = + pipeline_storage.list_pipelines(params).await.unwrap(); + + let pipelines = items.into_iter().map(PipelineItem::from).collect(); + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_RANGE, total_count.into()); + + (headers, Json(pipelines)) +} + +pub async fn get_pipeline( + Path((id,)): Path<(Uuid,)>, + Extension(pipeline_storage): Extension>, +) -> (StatusCode, Json>) { + let result = pipeline_storage + .get_pipeline_by_id(id) + .await + .unwrap() + .map(PipelineItem::from); + + let Some(result) = result else { + return (StatusCode::NOT_FOUND, Json(None)); + }; + (StatusCode::OK, Json(Some(result))) +} + +pub async fn update_pipeline( + Extension(pipeline_storage): Extension>, + Path((id,)): Path<(Uuid,)>, + Json(update): Json, +) -> (StatusCode, Json) { + let result = pipeline_storage.update(id, &update.name).await.unwrap(); + + ( + StatusCode::ACCEPTED, + Json(serde_json::to_value(result).unwrap()), + ) +} diff --git a/notifico-apiserver/src/http/admin/project.rs b/notifico-apiserver/src/http/admin/project.rs new file mode 100644 index 0000000..915278f --- /dev/null +++ b/notifico-apiserver/src/http/admin/project.rs @@ -0,0 +1,73 @@ +use axum::extract::{Path, Query}; +use axum::http::header::CONTENT_RANGE; +use axum::http::{HeaderMap, StatusCode}; +use axum::{Extension, Json}; +use notifico_core::http::admin::{ListQueryParams, PaginatedResult}; +use notifico_project::{Project, ProjectController}; +use serde::Deserialize; +use serde_json::{json, Value}; +use std::sync::Arc; +use uuid::Uuid; + +pub async fn list_projects( + Query(params): Query, + Extension(controller): Extension>, +) -> (HeaderMap, Json>) { + let PaginatedResult { items, total_count } = controller.list(params).await.unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_RANGE, total_count.into()); + + (headers, Json(items)) +} + +pub async fn get_project( + Path((id,)): Path<(Uuid,)>, + Extension(controller): Extension>, +) -> (StatusCode, Json>) { + let result = controller.get_by_id(id).await.unwrap(); + + let Some(result) = result else { + return (StatusCode::NOT_FOUND, Json(None)); + }; + (StatusCode::OK, Json(Some(result))) +} + +#[derive(Deserialize)] +pub struct ProjectUpdate { + name: String, +} + +pub async fn create_project( + Extension(controller): Extension>, + Json(update): Json, +) -> (StatusCode, Json) { + let result = controller.create(&update.name).await.unwrap(); + + ( + StatusCode::CREATED, + Json(serde_json::to_value(result).unwrap()), + ) +} + +pub async fn update_project( + Extension(controller): Extension>, + Path((id,)): Path<(Uuid,)>, + Json(update): Json, +) -> (StatusCode, Json) { + let result = controller.update(id, &update.name).await.unwrap(); + + ( + StatusCode::ACCEPTED, + Json(serde_json::to_value(result).unwrap()), + ) +} + +pub async fn delete_project( + Extension(controller): Extension>, + Path((id,)): Path<(Uuid,)>, +) -> (StatusCode, Json) { + controller.delete(id).await.unwrap(); + + (StatusCode::NO_CONTENT, Json(json!({}))) +} diff --git a/notifico-apiserver/src/http/mod.rs b/notifico-apiserver/src/http/mod.rs index 5c2cf4c..3bfd041 100644 --- a/notifico-apiserver/src/http/mod.rs +++ b/notifico-apiserver/src/http/mod.rs @@ -2,9 +2,12 @@ mod admin; mod ingest; mod recipient; +use axum::response::IntoResponse; use axum::{Extension, Router}; use notifico_core::http::SecretKey; use notifico_core::pipeline::runner::ProcessEventRequest; +use notifico_core::pipeline::storage::PipelineStorage; +use notifico_project::ProjectController; use notifico_subscription::SubscriptionManager; use std::net::SocketAddr; use std::sync::Arc; @@ -20,6 +23,8 @@ pub(crate) struct HttpExtensions { pub sender: Sender, pub subman: Arc, pub secret_key: Arc, + pub pipeline_storage: Arc, + pub projects_controller: Arc, } #[derive(OpenApi)] diff --git a/notifico-apiserver/src/main.rs b/notifico-apiserver/src/main.rs index 5287e71..99ccd41 100644 --- a/notifico-apiserver/src/main.rs +++ b/notifico-apiserver/src/main.rs @@ -4,6 +4,8 @@ mod http; use crate::http::HttpExtensions; use clap::Parser; use notifico_core::http::SecretKey; +use notifico_dbpipeline::DbPipelineStorage; +use notifico_project::ProjectController; use notifico_subscription::SubscriptionManager; use sea_orm::{ConnectOptions, Database}; use std::net::SocketAddr; @@ -62,12 +64,20 @@ async fn main() { )); subman.setup().await.unwrap(); + let pipeline_storage = Arc::new(DbPipelineStorage::new(db_connection.clone())); + pipeline_storage.setup().await.unwrap(); + + let projects = Arc::new(ProjectController::new(db_connection.clone())); + projects.setup().await.unwrap(); + let (request_tx, request_rx) = tokio::sync::mpsc::channel(1); let ext = HttpExtensions { + projects_controller: projects, sender: request_tx, subman, secret_key: Arc::new(SecretKey(args.secret_key.as_bytes().to_vec())), + pipeline_storage, }; tokio::spawn(http::start( diff --git a/notifico-core/src/config/mod.rs b/notifico-core/src/config/mod.rs index 2f32036..9c7a932 100644 --- a/notifico-core/src/config/mod.rs +++ b/notifico-core/src/config/mod.rs @@ -1,2 +1 @@ pub mod credentials; -pub mod pipelines; diff --git a/notifico-core/src/config/pipelines.rs b/notifico-core/src/config/pipelines.rs deleted file mode 100644 index baf04a0..0000000 --- a/notifico-core/src/config/pipelines.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::error::EngineError; -use crate::pipeline::storage::PipelineStorage; -use crate::pipeline::Pipeline; -use crate::step::SerializedStep; -use async_trait::async_trait; -use serde::Deserialize; -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::Arc; -use uuid::Uuid; - -#[derive(Deserialize, Clone)] -struct PipelineItem { - pub events: Vec, - #[serde(default = "Uuid::nil")] - pub project: Uuid, - pub channel: String, - pub steps: Vec, -} - -impl From for Pipeline { - fn from(value: PipelineItem) -> Self { - Self { - id: Uuid::nil(), - project_id: value.project, - channel: value.channel, - steps: value.steps, - } - } -} - -#[derive(Deserialize)] -pub struct PipelineConfig { - pipelines: Vec, -} - -#[derive(Eq, PartialEq, Hash, Clone)] -struct PipelineKey<'a> { - project: Uuid, - event: Cow<'a, str>, -} - -#[derive(Default)] -pub struct MemoryPipelineStorage(HashMap, Vec>>); - -impl MemoryPipelineStorage { - pub fn from_config(config: &PipelineConfig) -> Self { - let mut slf = Self::default(); - for pipeline_item in config.pipelines.iter() { - let pipeline = Arc::new(Pipeline::from(pipeline_item.clone())); - - for event in pipeline_item.events.iter() { - let pipeline = pipeline.clone(); - let key = PipelineKey { - project: Uuid::nil(), - event: Cow::Owned(event.clone()), - }; - - slf.0.entry(key).or_insert_with(Vec::new).push(pipeline) - } - } - slf - } -} - -#[async_trait] -impl PipelineStorage for MemoryPipelineStorage { - async fn get_pipelines_for_event( - &self, - project: Uuid, - event_name: &str, - ) -> Result, EngineError> { - if let Some(pipelines) = self.0.get(&PipelineKey { - project, - event: event_name.into(), - }) { - Ok(pipelines.iter().map(|p| p.as_ref()).cloned().collect()) - } else { - Ok(Vec::new()) - } - } -} diff --git a/notifico-core/src/pipeline/storage.rs b/notifico-core/src/pipeline/storage.rs index ffced38..5a7b312 100644 --- a/notifico-core/src/pipeline/storage.rs +++ b/notifico-core/src/pipeline/storage.rs @@ -1,8 +1,17 @@ use crate::error::EngineError; -use crate::pipeline::Pipeline; +use crate::http::admin::{ListQueryParams, PaginatedResult}; +use crate::pipeline::{Event, Pipeline}; use async_trait::async_trait; +use serde::Serialize; +use std::error::Error; use uuid::Uuid; +#[derive(Clone, Serialize)] +pub struct PipelineResult { + pub pipeline: Pipeline, + pub event_ids: Vec, +} + #[async_trait] pub trait PipelineStorage: Send + Sync { async fn get_pipelines_for_event( @@ -10,4 +19,25 @@ pub trait PipelineStorage: Send + Sync { project: Uuid, event_name: &str, ) -> Result, EngineError>; + async fn list_pipelines( + &self, + params: ListQueryParams, + ) -> Result, EngineError>; + async fn get_pipeline_by_id(&self, id: Uuid) -> Result, EngineError>; + async fn update_pipeline(&self, pipeline: Pipeline) -> Result<(), EngineError>; + async fn assign_events_to_pipeline( + &self, + pipeline_id: Uuid, + event_id: Vec, + ) -> Result<(), EngineError>; + + async fn list_events( + &self, + params: ListQueryParams, + ) -> Result, EngineError>; + + async fn get_event_by_id(&self, id: Uuid) -> Result, Box>; + async fn create_event(&self, project_id: Uuid, name: &str) -> Result>; + async fn update_event(&self, id: Uuid, name: &str) -> Result>; + async fn delete_event(&self, id: Uuid) -> Result<(), Box>; } diff --git a/notifico-dbpipeline/Cargo.toml b/notifico-dbpipeline/Cargo.toml new file mode 100644 index 0000000..ee0ffb8 --- /dev/null +++ b/notifico-dbpipeline/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "notifico-dbpipeline" +version = "0.1.0" +edition = "2021" + +[dependencies] +notifico-core = { path = "../notifico-core" } +notifico-dbpipeline-migration = { path = "migration" } + +sea-orm = { workspace = true } +async-trait = "0.1.83" +serde = "1.0.214" +serde_json = "1.0.132" +anyhow = "1.0.93" \ No newline at end of file diff --git a/notifico-dbpipeline/migration/Cargo.toml b/notifico-dbpipeline/migration/Cargo.toml new file mode 100644 index 0000000..0171847 --- /dev/null +++ b/notifico-dbpipeline/migration/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "notifico-dbpipeline-migration" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "migration" +path = "src/lib.rs" + +[dependencies] +async-std = { version = "1", features = ["attributes", "tokio1"] } +sea-orm-migration = { workspace = true } diff --git a/notifico-dbpipeline/migration/README.md b/notifico-dbpipeline/migration/README.md new file mode 100644 index 0000000..3b438d8 --- /dev/null +++ b/notifico-dbpipeline/migration/README.md @@ -0,0 +1,41 @@ +# Running Migrator CLI + +- Generate a new migration file + ```sh + cargo run -- generate MIGRATION_NAME + ``` +- Apply all pending migrations + ```sh + cargo run + ``` + ```sh + cargo run -- up + ``` +- Apply first 10 pending migrations + ```sh + cargo run -- up -n 10 + ``` +- Rollback last applied migrations + ```sh + cargo run -- down + ``` +- Rollback last 10 applied migrations + ```sh + cargo run -- down -n 10 + ``` +- Drop all tables from the database, then reapply all migrations + ```sh + cargo run -- fresh + ``` +- Rollback all applied migrations, then reapply all migrations + ```sh + cargo run -- refresh + ``` +- Rollback all applied migrations + ```sh + cargo run -- reset + ``` +- Check the status of all migrations + ```sh + cargo run -- status + ``` diff --git a/notifico-dbpipeline/migration/src/lib.rs b/notifico-dbpipeline/migration/src/lib.rs new file mode 100644 index 0000000..30d3145 --- /dev/null +++ b/notifico-dbpipeline/migration/src/lib.rs @@ -0,0 +1,16 @@ +pub use sea_orm_migration::prelude::*; + +mod m20220101_000001_create_table; + +pub struct Migrator; + +#[async_trait::async_trait] +impl MigratorTrait for Migrator { + fn migrations() -> Vec> { + vec![Box::new(m20220101_000001_create_table::Migration)] + } + + fn migration_table_name() -> DynIden { + Alias::new("pipeline_migrations").into_iden() + } +} diff --git a/notifico-dbpipeline/migration/src/m20220101_000001_create_table.rs b/notifico-dbpipeline/migration/src/m20220101_000001_create_table.rs new file mode 100644 index 0000000..4486c7a --- /dev/null +++ b/notifico-dbpipeline/migration/src/m20220101_000001_create_table.rs @@ -0,0 +1,120 @@ +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Event::Table) + .if_not_exists() + .col(pk_uuid(Event::Id)) + .col(uuid(Event::ProjectId)) + .col(string(Event::Name)) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .unique() + .table(Event::Table) + .name("idx_u_event_name") + .if_not_exists() + .col(Event::ProjectId) + .col(Event::Name) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(Pipeline::Table) + .if_not_exists() + .col(pk_uuid(Pipeline::Id)) + .col(uuid(Pipeline::ProjectId)) + .col(string(Pipeline::Channel)) + .col(json_binary(Pipeline::Steps)) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(PipelineEventJ::Table) + .if_not_exists() + .col(uuid(PipelineEventJ::PipelineId)) + .col(uuid(PipelineEventJ::EventId)) + .foreign_key( + ForeignKey::create() + .from(PipelineEventJ::Table, PipelineEventJ::PipelineId) + .to(Pipeline::Table, Pipeline::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Restrict), + ) + .foreign_key( + ForeignKey::create() + .from(PipelineEventJ::Table, PipelineEventJ::EventId) + .to(Event::Table, Event::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Restrict), + ) + .index( + Index::create() + .primary() + .name("pk_pipeline_event_j") + .table(PipelineEventJ::Table) + .col(PipelineEventJ::PipelineId) + .col(PipelineEventJ::EventId), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(PipelineEventJ::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(Pipeline::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(Event::Table).to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Event { + Table, + Id, + ProjectId, + Name, +} + +#[derive(DeriveIden)] +enum Pipeline { + Table, + Id, + ProjectId, + Channel, + Steps, +} + +#[derive(DeriveIden)] +enum PipelineEventJ { + Table, + PipelineId, + EventId, +} diff --git a/notifico-dbpipeline/migration/src/main.rs b/notifico-dbpipeline/migration/src/main.rs new file mode 100644 index 0000000..c6b6e48 --- /dev/null +++ b/notifico-dbpipeline/migration/src/main.rs @@ -0,0 +1,6 @@ +use sea_orm_migration::prelude::*; + +#[async_std::main] +async fn main() { + cli::run_cli(migration::Migrator).await; +} diff --git a/notifico-dbpipeline/src/entity/event.rs b/notifico-dbpipeline/src/entity/event.rs new file mode 100644 index 0000000..b17b735 --- /dev/null +++ b/notifico-dbpipeline/src/entity/event.rs @@ -0,0 +1,35 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "event")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub project_id: Uuid, + pub name: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::pipeline_event_j::Entity")] + PipelineEventJ, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::PipelineEventJ.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + super::pipeline_event_j::Relation::Pipeline.def() + } + fn via() -> Option { + Some(super::pipeline_event_j::Relation::Event.def().rev()) + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/notifico-dbpipeline/src/entity/mod.rs b/notifico-dbpipeline/src/entity/mod.rs new file mode 100644 index 0000000..65548a6 --- /dev/null +++ b/notifico-dbpipeline/src/entity/mod.rs @@ -0,0 +1,7 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub mod prelude; + +pub mod event; +pub mod pipeline; +pub mod pipeline_event_j; diff --git a/notifico-dbpipeline/src/entity/pipeline.rs b/notifico-dbpipeline/src/entity/pipeline.rs new file mode 100644 index 0000000..e60671f --- /dev/null +++ b/notifico-dbpipeline/src/entity/pipeline.rs @@ -0,0 +1,37 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "pipeline")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub project_id: Uuid, + pub channel: String, + #[sea_orm(column_type = "JsonBinary")] + pub steps: Json, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::pipeline_event_j::Entity")] + PipelineEventJ, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::PipelineEventJ.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + super::pipeline_event_j::Relation::Event.def() + } + fn via() -> Option { + Some(super::pipeline_event_j::Relation::Pipeline.def().rev()) + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/notifico-dbpipeline/src/entity/pipeline_event_j.rs b/notifico-dbpipeline/src/entity/pipeline_event_j.rs new file mode 100644 index 0000000..7951042 --- /dev/null +++ b/notifico-dbpipeline/src/entity/pipeline_event_j.rs @@ -0,0 +1,46 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "pipeline_event_j")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub pipeline_id: Uuid, + #[sea_orm(primary_key, auto_increment = false)] + pub event_id: Uuid, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::event::Entity", + from = "Column::EventId", + to = "super::event::Column::Id", + on_update = "Restrict", + on_delete = "Cascade" + )] + Event, + #[sea_orm( + belongs_to = "super::pipeline::Entity", + from = "Column::PipelineId", + to = "super::pipeline::Column::Id", + on_update = "Restrict", + on_delete = "Cascade" + )] + Pipeline, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Event.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Pipeline.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/notifico-dbpipeline/src/entity/prelude.rs b/notifico-dbpipeline/src/entity/prelude.rs new file mode 100644 index 0000000..31bbf6e --- /dev/null +++ b/notifico-dbpipeline/src/entity/prelude.rs @@ -0,0 +1,5 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub use super::event::Entity as Event; +pub use super::pipeline::Entity as Pipeline; +pub use super::pipeline_event_j::Entity as PipelineEventJ; diff --git a/notifico-dbpipeline/src/lib.rs b/notifico-dbpipeline/src/lib.rs new file mode 100644 index 0000000..f1210f4 --- /dev/null +++ b/notifico-dbpipeline/src/lib.rs @@ -0,0 +1,194 @@ +use async_trait::async_trait; +use migration::{Migrator, MigratorTrait}; +use notifico_core::error::EngineError; +use notifico_core::http::admin::{ListQueryParams, ListableTrait, PaginatedResult}; +use notifico_core::pipeline::storage::{PipelineResult, PipelineStorage}; +use notifico_core::pipeline::{Event, Pipeline}; +use sea_orm::prelude::Uuid; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, LoaderTrait, PaginatorTrait, + QueryFilter, QuerySelect, Set, +}; +use serde::Deserialize; +use std::error::Error; + +mod entity; + +pub struct DbPipelineStorage { + db: DatabaseConnection, +} + +impl DbPipelineStorage { + pub fn new(db: DatabaseConnection) -> Self { + Self { db } + } + + pub async fn setup(&self) -> anyhow::Result<()> { + Ok(Migrator::up(&self.db, None).await?) + } +} + +#[async_trait] +impl PipelineStorage for DbPipelineStorage { + // For service API. Performance-critical + async fn get_pipelines_for_event( + &self, + project: Uuid, + event_name: &str, + ) -> Result, EngineError> { + let models = entity::pipeline::Entity::find() + .filter(entity::pipeline::Column::ProjectId.eq(project)) + .filter(entity::event::Column::Name.eq(event_name)) + .all(&self.db) + .await?; + + models.into_iter().map(|m| m.try_into()).collect() + } + + // For management API + async fn list_pipelines( + &self, + params: ListQueryParams, + ) -> Result, EngineError> { + let events = entity::pipeline::Entity::find() + .apply_params(¶ms) + .unwrap() + .find_with_related(entity::event::Entity) + .all(&self.db) + .await?; + + let results: Result, EngineError> = events + .into_iter() + .map(|(p, e)| { + Ok(PipelineResult { + pipeline: p.try_into()?, + event_ids: e.into_iter().map(|e| e.id).collect(), + }) + }) + .collect(); + let results = results?; + + Ok(PaginatedResult { + items: results, + total_count: entity::pipeline::Entity::find() + .apply_filter(¶ms) + .unwrap() + .count(&self.db) + .await?, + }) + } + + async fn get_pipeline_by_id(&self, id: Uuid) -> Result, EngineError> { + let events = entity::pipeline::Entity::find_by_id(id) + .find_with_related(entity::event::Entity) + .all(&self.db) + .await?; + + let results: Result, EngineError> = events + .into_iter() + .map(|(p, e)| { + Ok(PipelineResult { + pipeline: p.try_into()?, + event_ids: e.into_iter().map(|e| e.id).collect(), + }) + }) + .collect(); + let results = results?; + + Ok(results.first().cloned()) + } + + async fn list_events( + &self, + params: ListQueryParams, + ) -> Result, EngineError> { + Ok(PaginatedResult { + items: entity::event::Entity::find() + .apply_params(¶ms) + .unwrap() + .all(&self.db) + .await? + .into_iter() + .map(Event::from) + .collect(), + total_count: entity::event::Entity::find() + .apply_filter(¶ms) + .unwrap() + .count(&self.db) + .await?, + }) + } + + async fn get_event_by_id(&self, id: Uuid) -> Result, Box> { + let model = entity::event::Entity::find_by_id(id).one(&self.db).await?; + + Ok(model.map(Event::from)) + } + + async fn create_event( + &self, + project_id: Uuid, + name: &str, + ) -> Result> { + let id = Uuid::now_v7(); + + entity::event::ActiveModel { + id: Set(id), + project_id: Set(project_id), + name: Set(name.to_string()), + } + .insert(&self.db) + .await?; + + Ok(Event { + id, + project_id, + name: name.to_string(), + }) + } + + async fn update_event(&self, id: Uuid, name: &str) -> Result> { + entity::event::ActiveModel { + id: Set(id), + name: Set(name.to_string()), + ..Default::default() + } + .update(&self.db) + .await?; + + Ok(self.get_event_by_id(id).await?.unwrap()) + } + + async fn delete_event(&self, id: Uuid) -> Result<(), Box> { + entity::event::ActiveModel { + id: Set(id), + ..Default::default() + } + .delete(&self.db) + .await?; + Ok(()) + } +} + +impl TryFrom for Pipeline { + type Error = EngineError; + + fn try_from(value: entity::pipeline::Model) -> Result { + Ok(Self { + id: value.id, + project_id: value.project_id, + channel: value.channel, + steps: Vec::deserialize(value.steps).map_err(EngineError::InvalidStep)?, + }) + } +} + +impl From for Event { + fn from(value: entity::event::Model) -> Self { + Self { + id: value.id, + project_id: value.project_id, + name: value.name, + } + } +} diff --git a/notifico-project/Cargo.toml b/notifico-project/Cargo.toml new file mode 100644 index 0000000..2be021e --- /dev/null +++ b/notifico-project/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "notifico-project" +version = "0.1.0" +edition = "2021" + +[dependencies] +notifico-project-migration = { path = "migration" } +notifico-core = { path = "../notifico-core" } + +sea-orm = { workspace = true } +uuid = { version = "1.11.0", features = ["v7"] } +serde = { version = "1.0.214", features = ["derive"] } \ No newline at end of file diff --git a/notifico-project/migration/Cargo.toml b/notifico-project/migration/Cargo.toml new file mode 100644 index 0000000..cca48bc --- /dev/null +++ b/notifico-project/migration/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "notifico-project-migration" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "migration" +path = "src/lib.rs" + +[dependencies] +async-std = { version = "1", features = ["attributes", "tokio1"] } +uuid = "1.11.0" +sea-orm = { workspace = true } +sea-orm-migration = { workspace = true } \ No newline at end of file diff --git a/notifico-project/migration/README.md b/notifico-project/migration/README.md new file mode 100644 index 0000000..3b438d8 --- /dev/null +++ b/notifico-project/migration/README.md @@ -0,0 +1,41 @@ +# Running Migrator CLI + +- Generate a new migration file + ```sh + cargo run -- generate MIGRATION_NAME + ``` +- Apply all pending migrations + ```sh + cargo run + ``` + ```sh + cargo run -- up + ``` +- Apply first 10 pending migrations + ```sh + cargo run -- up -n 10 + ``` +- Rollback last applied migrations + ```sh + cargo run -- down + ``` +- Rollback last 10 applied migrations + ```sh + cargo run -- down -n 10 + ``` +- Drop all tables from the database, then reapply all migrations + ```sh + cargo run -- fresh + ``` +- Rollback all applied migrations, then reapply all migrations + ```sh + cargo run -- refresh + ``` +- Rollback all applied migrations + ```sh + cargo run -- reset + ``` +- Check the status of all migrations + ```sh + cargo run -- status + ``` diff --git a/notifico-project/migration/src/entity/mod.rs b/notifico-project/migration/src/entity/mod.rs new file mode 100644 index 0000000..c3a50fb --- /dev/null +++ b/notifico-project/migration/src/entity/mod.rs @@ -0,0 +1,5 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub mod prelude; + +pub mod project; diff --git a/notifico-project/migration/src/entity/prelude.rs b/notifico-project/migration/src/entity/prelude.rs new file mode 100644 index 0000000..5d59ffc --- /dev/null +++ b/notifico-project/migration/src/entity/prelude.rs @@ -0,0 +1,3 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub use super::project::Entity as Project; diff --git a/notifico-project/migration/src/entity/project.rs b/notifico-project/migration/src/entity/project.rs new file mode 100644 index 0000000..0f0b976 --- /dev/null +++ b/notifico-project/migration/src/entity/project.rs @@ -0,0 +1,16 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "project")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub name: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/notifico-project/migration/src/lib.rs b/notifico-project/migration/src/lib.rs new file mode 100644 index 0000000..b2d6736 --- /dev/null +++ b/notifico-project/migration/src/lib.rs @@ -0,0 +1,17 @@ +pub use sea_orm_migration::prelude::*; + +mod entity; +mod m20220101_000001_create_table; + +pub struct Migrator; + +#[async_trait::async_trait] +impl MigratorTrait for Migrator { + fn migrations() -> Vec> { + vec![Box::new(m20220101_000001_create_table::Migration)] + } + + fn migration_table_name() -> DynIden { + Alias::new("project_migrations").into_iden() + } +} diff --git a/notifico-project/migration/src/m20220101_000001_create_table.rs b/notifico-project/migration/src/m20220101_000001_create_table.rs new file mode 100644 index 0000000..4c93c80 --- /dev/null +++ b/notifico-project/migration/src/m20220101_000001_create_table.rs @@ -0,0 +1,48 @@ +use crate::entity::project; +use sea_orm::prelude::Uuid; +use sea_orm::{ActiveModelTrait, Set}; +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Project::Table) + .if_not_exists() + .col(pk_uuid(Project::Id)) + .col(string(Project::Name)) + .to_owned(), + ) + .await?; + + let db = manager.get_connection(); + + project::ActiveModel { + id: Set(Uuid::nil()), + name: Set("Default Project".to_string()), + ..Default::default() + } + .insert(db) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Project::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum Project { + Table, + Id, + Name, +} diff --git a/notifico-project/migration/src/main.rs b/notifico-project/migration/src/main.rs new file mode 100644 index 0000000..c6b6e48 --- /dev/null +++ b/notifico-project/migration/src/main.rs @@ -0,0 +1,6 @@ +use sea_orm_migration::prelude::*; + +#[async_std::main] +async fn main() { + cli::run_cli(migration::Migrator).await; +} diff --git a/notifico-project/src/entity/mod.rs b/notifico-project/src/entity/mod.rs new file mode 100644 index 0000000..c3a50fb --- /dev/null +++ b/notifico-project/src/entity/mod.rs @@ -0,0 +1,5 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub mod prelude; + +pub mod project; diff --git a/notifico-project/src/entity/prelude.rs b/notifico-project/src/entity/prelude.rs new file mode 100644 index 0000000..5d59ffc --- /dev/null +++ b/notifico-project/src/entity/prelude.rs @@ -0,0 +1,3 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +pub use super::project::Entity as Project; diff --git a/notifico-project/src/entity/project.rs b/notifico-project/src/entity/project.rs new file mode 100644 index 0000000..0f0b976 --- /dev/null +++ b/notifico-project/src/entity/project.rs @@ -0,0 +1,16 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "project")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub name: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/notifico-project/src/lib.rs b/notifico-project/src/lib.rs new file mode 100644 index 0000000..f8485bd --- /dev/null +++ b/notifico-project/src/lib.rs @@ -0,0 +1,102 @@ +use migration::{Migrator, MigratorTrait}; +use notifico_core::http::admin::{ListQueryParams, ListableTrait, PaginatedResult}; +use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, PaginatorTrait, Set}; +use serde::Serialize; +use std::error::Error; +use uuid::Uuid; + +mod entity; + +#[derive(Clone, Debug, Serialize)] +pub struct Project { + pub id: Uuid, + pub name: String, +} + +pub struct ProjectController { + db: DatabaseConnection, +} + +impl ProjectController { + pub fn new(db: DatabaseConnection) -> Self { + Self { db } + } + + pub async fn setup(&self) -> Result<(), Box> { + Ok(Migrator::up(&self.db, None).await?) + } + + pub async fn create(&self, name: &str) -> Result> { + let id = Uuid::now_v7(); + + entity::project::ActiveModel { + id: Set(id), + name: Set(name.to_string()), + } + .insert(&self.db) + .await?; + + Ok(Project { + id, + name: name.to_string(), + }) + } + + pub async fn get_by_id(&self, id: Uuid) -> Result, Box> { + let query = entity::project::Entity::find_by_id(id) + .one(&self.db) + .await?; + Ok(query.map(Project::from)) + } + + pub async fn list( + &self, + params: ListQueryParams, + ) -> Result, Box> { + let query = entity::project::Entity::find() + .apply_params(¶ms)? + .all(&self.db) + .await?; + + Ok(PaginatedResult { + items: query.into_iter().map(Project::from).collect(), + total_count: entity::project::Entity::find().count(&self.db).await?, + }) + } + + pub async fn update(&self, id: Uuid, name: &str) -> Result> { + entity::project::ActiveModel { + id: Set(id), + name: Set(name.to_string()), + } + .update(&self.db) + .await?; + Ok(Project { + id, + name: name.to_string(), + }) + } + + pub async fn delete(&self, id: Uuid) -> Result<(), Box> { + if id.is_nil() { + return Ok(()); + } + + entity::project::ActiveModel { + id: Set(id), + ..Default::default() + } + .delete(&self.db) + .await?; + Ok(()) + } +} + +impl From for Project { + fn from(value: entity::project::Model) -> Self { + Project { + id: value.id, + name: value.name, + } + } +} diff --git a/notifico-worker/Cargo.toml b/notifico-worker/Cargo.toml index 2488524..993810a 100644 --- a/notifico-worker/Cargo.toml +++ b/notifico-worker/Cargo.toml @@ -11,13 +11,14 @@ notifico-whatsapp = { path = "../transports/notifico-whatsapp" } notifico-smpp = { path = "../transports/notifico-smpp" } notifico-template = { path = "../notifico-template" } notifico-subscription = { path = "../notifico-subscription" } +notifico-dbpipeline = { path = "../notifico-dbpipeline" } anyhow = "1.0.91" async-trait = "0.1.83" clap = { workspace = true } dotenvy = "0.15.7" fe2o3-amqp = { version = "0.13.1", features = ["acceptor"] } -figment = { version = "0.10.19", features = ["toml", "yaml"] } +figment = { version = "0.10.19", features = ["env", "toml"] } futures = "0.3.31" sea-orm = { workspace = true } serde = { version = "1.0.210", features = ["derive"] } diff --git a/notifico-worker/src/amqp.rs b/notifico-worker/src/amqp.rs index 664a0b9..1329d04 100644 --- a/notifico-worker/src/amqp.rs +++ b/notifico-worker/src/amqp.rs @@ -80,6 +80,7 @@ async fn connect_to_broker( async fn process_link(mut receiver: Receiver, runner: Arc) -> anyhow::Result<()> { loop { let delivery = receiver.recv::().await?; + receiver.accept(&delivery).await?; let eventrequest = serde_json::from_str(delivery.body())?; runner.process_eventrequest(eventrequest).await; diff --git a/notifico-worker/src/main.rs b/notifico-worker/src/main.rs index e1ea8fe..654ec53 100644 --- a/notifico-worker/src/main.rs +++ b/notifico-worker/src/main.rs @@ -1,12 +1,15 @@ mod amqp; use clap::Parser; -use figment::providers::Yaml; -use figment::{providers::Format, providers::Toml, Figment}; +use figment::providers::Toml; +use figment::{ + providers::{Env, Format}, + Figment, +}; use notifico_core::config::credentials::MemoryCredentialStorage; -use notifico_core::config::pipelines::{MemoryPipelineStorage, PipelineConfig}; use notifico_core::engine::Engine; use notifico_core::pipeline::runner::PipelineRunner; +use notifico_dbpipeline::DbPipelineStorage; use notifico_smpp::SmppPlugin; use notifico_smtp::EmailPlugin; use notifico_subscription::SubscriptionManager; @@ -36,7 +39,7 @@ struct Args { env = "NOTIFICO_AMQP_WORKERS_ADDR", default_value = "notifico_workers" )] - amqp_workers_addr: String, + amqp_addr: String, #[clap( long, @@ -86,11 +89,7 @@ async fn main() { let credential_config: serde_json::Value = Figment::new() .merge(Toml::file(args.credentials_path.clone())) - .extract() - .unwrap(); - - let pipeline_config: PipelineConfig = Figment::new() - .merge(Yaml::file(args.pipelines_path.clone())) + .merge(Env::prefixed("NOTIFICO_CREDENTIAL_")) .extract() .unwrap(); @@ -98,7 +97,7 @@ async fn main() { let db_connection = Database::connect(db_conn_options).await.unwrap(); let credentials = Arc::new(MemoryCredentialStorage::from_config(credential_config).unwrap()); - let pipelines = Arc::new(MemoryPipelineStorage::from_config(&pipeline_config)); + let pipelines = Arc::new(DbPipelineStorage::new(db_connection.clone())); // Create Engine with plugins let mut engine = Engine::new(); @@ -122,11 +121,7 @@ async fn main() { // Create PipelineRunner, the core component of the Notifico system let runner = Arc::new(PipelineRunner::new(pipelines.clone(), engine)); - tokio::spawn(amqp::start( - runner.clone(), - args.amqp, - args.amqp_workers_addr, - )); + tokio::spawn(amqp::start(runner.clone(), args.amqp, args.amqp_addr)); tokio::signal::ctrl_c().await.unwrap(); } diff --git a/pipelines.yml b/pipelines.yml index 6001ac9..707779b 100644 --- a/pipelines.yml +++ b/pipelines.yml @@ -1,3 +1,5 @@ +events: + - name: send_notification pipelines: - channel: telegram events: diff --git a/regen_entities.sh b/regen_entities.sh index e677cbe..c9378f3 100755 --- a/regen_entities.sh +++ b/regen_entities.sh @@ -10,3 +10,17 @@ sea-orm-cli migrate -d migration up sea-orm-cli generate entity -o src/entity --ignore-tables subscription_migrations rm "$TEMPDB" popd + +pushd notifico-dbpipeline +touch "$TEMPDB" +sea-orm-cli migrate -d migration up +sea-orm-cli generate entity -o src/entity --ignore-tables pipeline_migrations +rm "$TEMPDB" +popd + +pushd notifico-project +touch "$TEMPDB" +sea-orm-cli migrate -d migration up +sea-orm-cli generate entity -o src/entity --ignore-tables project_migrations +rm "$TEMPDB" +popd