Skip to content

Commit

Permalink
Revert "Use local everything"
Browse files Browse the repository at this point in the history
This reverts commit 8303dee.
  • Loading branch information
GamePad64 committed Nov 12, 2024
1 parent 8303dee commit c6d937c
Show file tree
Hide file tree
Showing 41 changed files with 1,172 additions and 101 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ members = [
"notifico-apiserver",
"notifico-subscription",
"notifico-subscription/migration",
"notifico-dbpipeline",
"notifico-dbpipeline/migration",
"notifico-project",
"notifico-project/migration",
]

[workspace.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions notifico-apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ edition = "2021"
[dependencies]
notifico-core = { path = "../notifico-core" }
notifico-subscription = { path = "../notifico-subscription" }
notifico-dbpipeline = { path = "../notifico-dbpipeline" }
notifico-project = { path = "../notifico-project" }

anyhow = "1.0.93"
async-trait = "0.1.83"
Expand Down
87 changes: 87 additions & 0 deletions notifico-apiserver/src/http/admin/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use axum::extract::{Path, Query};
use axum::http::header::CONTENT_RANGE;
use axum::http::{HeaderMap, StatusCode};
use axum::{Extension, Json};
use notifico_core::http::admin::{ListQueryParams, PaginatedResult};
use notifico_core::pipeline::storage::PipelineStorage;
use notifico_core::pipeline::Event;
use serde::Deserialize;
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;

pub async fn list_events(
Query(params): Query<ListQueryParams>,
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
) -> (HeaderMap, Json<Vec<Event>>) {
let PaginatedResult { items, total_count } =
pipeline_storage.list_events(params).await.unwrap();

let mut headers = HeaderMap::new();
headers.insert(CONTENT_RANGE, total_count.into());

(headers, Json(items))
}

pub async fn get_event(
Path((id,)): Path<(Uuid,)>,
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
) -> (StatusCode, Json<Option<Event>>) {
let result = pipeline_storage.get_event_by_id(id).await.unwrap();

let Some(result) = result else {
return (StatusCode::NOT_FOUND, Json(None));
};
(StatusCode::OK, Json(Some(result)))
}

#[derive(Deserialize)]
pub struct EventCreate {
project_id: Uuid,
name: String,
}

pub async fn create_event(
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
Json(create): Json<EventCreate>,
) -> (StatusCode, Json<Value>) {
let result = pipeline_storage
.create_event(create.project_id, &create.name)
.await
.unwrap();

(
StatusCode::CREATED,
Json(serde_json::to_value(result).unwrap()),
)
}

#[derive(Deserialize)]
pub struct EventUpdate {
name: String,
}

pub async fn update_event(
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
Path((id,)): Path<(Uuid,)>,
Json(update): Json<EventUpdate>,
) -> (StatusCode, Json<Value>) {
let result = pipeline_storage
.update_event(id, &update.name)
.await
.unwrap();

(
StatusCode::CREATED,
Json(serde_json::to_value(result).unwrap()),
)
}

pub async fn delete_event(
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
Path((id,)): Path<(Uuid,)>,
) -> (StatusCode, Json<Value>) {
pipeline_storage.delete_event(id).await.unwrap();

(StatusCode::NO_CONTENT, Json(json!({})))
}
31 changes: 30 additions & 1 deletion notifico-apiserver/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::http::HttpExtensions;
use axum::routing::{get, put};
use axum::{Extension, Router};
use tower_http::cors::CorsLayer;

mod event;
mod pipeline;
mod project;
pub mod subscription;

pub(crate) fn get_router(ext: HttpExtensions) -> Router {
Expand All @@ -14,7 +16,34 @@ pub(crate) fn get_router(ext: HttpExtensions) -> Router {
"/v1/subscriptions/:id",
put(subscription::update_subscription),
)
// Pipelines
.route("/v1/pipelines", get(pipeline::list_pipelines))
.route("/v1/pipelines/:id", get(pipeline::get_pipeline))
// Events
.route(
"/v1/events",
get(event::list_events).post(event::create_event),
)
.route(
"/v1/events/:id",
get(event::get_event)
.put(event::update_event)
.delete(event::delete_event),
)
// Projects
.route(
"/v1/projects",
get(project::list_projects).post(project::create_project),
)
.route(
"/v1/projects/:id",
get(project::get_project)
.put(project::update_project)
.delete(project::delete_project),
)
// Layers
.layer(Extension(ext.subman))
.layer(Extension(ext.pipeline_storage))
.layer(Extension(ext.projects_controller))
.layer(CorsLayer::permissive())
}
78 changes: 78 additions & 0 deletions notifico-apiserver/src/http/admin/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::http::admin::project::ProjectUpdate;
use axum::extract::{Path, Query};
use axum::http::header::CONTENT_RANGE;
use axum::http::{HeaderMap, StatusCode};
use axum::{Extension, Json};
use notifico_core::http::admin::{ListQueryParams, PaginatedResult};
use notifico_core::pipeline::storage::{PipelineResult, PipelineStorage};
use notifico_core::step::SerializedStep;
use serde::Serialize;
use serde_json::Value;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Clone, Serialize)]
pub struct PipelineItem {
pub id: Uuid,
pub project_id: Uuid,
pub event_ids: Vec<Uuid>,
pub steps: Vec<SerializedStep>,
pub channel: String,
}

impl From<PipelineResult> for PipelineItem {
fn from(value: PipelineResult) -> Self {
Self {
id: value.pipeline.id,
project_id: value.pipeline.project_id,
steps: value.pipeline.steps.clone(),
channel: value.pipeline.channel,

event_ids: value.event_ids,
}
}
}

pub async fn list_pipelines(
Query(params): Query<ListQueryParams>,
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
) -> (HeaderMap, Json<Vec<PipelineItem>>) {
let PaginatedResult { items, total_count } =
pipeline_storage.list_pipelines(params).await.unwrap();

let pipelines = items.into_iter().map(PipelineItem::from).collect();

let mut headers = HeaderMap::new();
headers.insert(CONTENT_RANGE, total_count.into());

(headers, Json(pipelines))
}

pub async fn get_pipeline(
Path((id,)): Path<(Uuid,)>,
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
) -> (StatusCode, Json<Option<PipelineItem>>) {
let result = pipeline_storage
.get_pipeline_by_id(id)
.await
.unwrap()
.map(PipelineItem::from);

let Some(result) = result else {
return (StatusCode::NOT_FOUND, Json(None));
};
(StatusCode::OK, Json(Some(result)))
}

pub async fn update_pipeline(
Extension(pipeline_storage): Extension<Arc<dyn PipelineStorage>>,
Path((id,)): Path<(Uuid,)>,
Json(update): Json<ProjectUpdate>,
) -> (StatusCode, Json<Value>) {
let result = pipeline_storage.update(id, &update.name).await.unwrap();

(
StatusCode::ACCEPTED,
Json(serde_json::to_value(result).unwrap()),
)
}
73 changes: 73 additions & 0 deletions notifico-apiserver/src/http/admin/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use axum::extract::{Path, Query};
use axum::http::header::CONTENT_RANGE;
use axum::http::{HeaderMap, StatusCode};
use axum::{Extension, Json};
use notifico_core::http::admin::{ListQueryParams, PaginatedResult};
use notifico_project::{Project, ProjectController};
use serde::Deserialize;
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;

pub async fn list_projects(
Query(params): Query<ListQueryParams>,
Extension(controller): Extension<Arc<ProjectController>>,
) -> (HeaderMap, Json<Vec<Project>>) {
let PaginatedResult { items, total_count } = controller.list(params).await.unwrap();

let mut headers = HeaderMap::new();
headers.insert(CONTENT_RANGE, total_count.into());

(headers, Json(items))
}

pub async fn get_project(
Path((id,)): Path<(Uuid,)>,
Extension(controller): Extension<Arc<ProjectController>>,
) -> (StatusCode, Json<Option<Project>>) {
let result = controller.get_by_id(id).await.unwrap();

let Some(result) = result else {
return (StatusCode::NOT_FOUND, Json(None));
};
(StatusCode::OK, Json(Some(result)))
}

#[derive(Deserialize)]
pub struct ProjectUpdate {
name: String,
}

pub async fn create_project(
Extension(controller): Extension<Arc<ProjectController>>,
Json(update): Json<ProjectUpdate>,
) -> (StatusCode, Json<Value>) {
let result = controller.create(&update.name).await.unwrap();

(
StatusCode::CREATED,
Json(serde_json::to_value(result).unwrap()),
)
}

pub async fn update_project(
Extension(controller): Extension<Arc<ProjectController>>,
Path((id,)): Path<(Uuid,)>,
Json(update): Json<ProjectUpdate>,
) -> (StatusCode, Json<Value>) {
let result = controller.update(id, &update.name).await.unwrap();

(
StatusCode::ACCEPTED,
Json(serde_json::to_value(result).unwrap()),
)
}

pub async fn delete_project(
Extension(controller): Extension<Arc<ProjectController>>,
Path((id,)): Path<(Uuid,)>,
) -> (StatusCode, Json<Value>) {
controller.delete(id).await.unwrap();

(StatusCode::NO_CONTENT, Json(json!({})))
}
5 changes: 5 additions & 0 deletions notifico-apiserver/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ mod admin;
mod ingest;
mod recipient;

use axum::response::IntoResponse;
use axum::{Extension, Router};
use notifico_core::http::SecretKey;
use notifico_core::pipeline::runner::ProcessEventRequest;
use notifico_core::pipeline::storage::PipelineStorage;
use notifico_project::ProjectController;
use notifico_subscription::SubscriptionManager;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -20,6 +23,8 @@ pub(crate) struct HttpExtensions {
pub sender: Sender<ProcessEventRequest>,
pub subman: Arc<SubscriptionManager>,
pub secret_key: Arc<SecretKey>,
pub pipeline_storage: Arc<dyn PipelineStorage>,
pub projects_controller: Arc<ProjectController>,
}

#[derive(OpenApi)]
Expand Down
10 changes: 10 additions & 0 deletions notifico-apiserver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ mod http;
use crate::http::HttpExtensions;
use clap::Parser;
use notifico_core::http::SecretKey;
use notifico_dbpipeline::DbPipelineStorage;
use notifico_project::ProjectController;
use notifico_subscription::SubscriptionManager;
use sea_orm::{ConnectOptions, Database};
use std::net::SocketAddr;
Expand Down Expand Up @@ -62,12 +64,20 @@ async fn main() {
));
subman.setup().await.unwrap();

let pipeline_storage = Arc::new(DbPipelineStorage::new(db_connection.clone()));
pipeline_storage.setup().await.unwrap();

let projects = Arc::new(ProjectController::new(db_connection.clone()));
projects.setup().await.unwrap();

let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);

let ext = HttpExtensions {
projects_controller: projects,
sender: request_tx,
subman,
secret_key: Arc::new(SecretKey(args.secret_key.as_bytes().to_vec())),
pipeline_storage,
};

tokio::spawn(http::start(
Expand Down
1 change: 0 additions & 1 deletion notifico-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod credentials;
pub mod pipelines;
Loading

0 comments on commit c6d937c

Please sign in to comment.