From b0a0b55b56f12f60006cdeadad14586665479221 Mon Sep 17 00:00:00 2001 From: Alexander Shishenko Date: Mon, 16 Sep 2024 22:30:18 +0300 Subject: [PATCH] Split monolithic code into several crates: core, telegram. Remove http interface, for now. --- Cargo.toml | 15 +- notifico-core/Cargo.toml | 12 ++ notifico-core/src/credentials.rs | 13 ++ notifico-core/src/engine.rs | 38 +++++ notifico-core/src/lib.rs | 5 + {src/engine => notifico-core/src}/pipeline.rs | 13 +- notifico-core/src/recipient.rs | 35 +++++ notifico-core/src/templater.rs | 36 +++++ notifico-telegram/Cargo.toml | 14 ++ notifico-telegram/src/contact.rs | 23 ++++ notifico-telegram/src/lib.rs | 130 ++++++++++++++++++ notifico-telegram/src/step.rs | 30 ++++ notifico.yml | 37 +++++ pipeline.yml | 24 ---- src/config.rs | 34 +++++ src/engine/mod.rs | 71 +++------- src/engine/settings.rs | 30 ---- src/engine/step.rs | 32 ----- src/engine/telegram.rs | 112 --------------- src/main.rs | 121 ++++++---------- src/recipients.rs | 25 ++++ src/templater/mod.rs | 1 + .../templater.rs => templater/service.rs} | 33 +---- 23 files changed, 512 insertions(+), 372 deletions(-) create mode 100644 notifico-core/Cargo.toml create mode 100644 notifico-core/src/credentials.rs create mode 100644 notifico-core/src/engine.rs create mode 100644 notifico-core/src/lib.rs rename {src/engine => notifico-core/src}/pipeline.rs (55%) create mode 100644 notifico-core/src/recipient.rs create mode 100644 notifico-core/src/templater.rs create mode 100644 notifico-telegram/Cargo.toml create mode 100644 notifico-telegram/src/contact.rs create mode 100644 notifico-telegram/src/lib.rs create mode 100644 notifico-telegram/src/step.rs create mode 100644 notifico.yml delete mode 100644 pipeline.yml create mode 100644 src/config.rs delete mode 100644 src/engine/settings.rs delete mode 100644 src/engine/step.rs delete mode 100644 src/engine/telegram.rs create mode 100644 src/recipients.rs create mode 100644 src/templater/mod.rs rename src/{engine/templater.rs => templater/service.rs} (61%) diff --git a/Cargo.toml b/Cargo.toml index 1bcc269..88f9690 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,19 +9,18 @@ tokio = { version = "1.40", features = ["full"] } log = "0.4.22" futures = "0.3.30" tracing = "0.1.40" -tracing-subscriber = "0.3.18" -axum = { version = "0.7.5", features = ["macros"] } -teloxide = "0.13.0" +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] } serde = { version = "1.0.210", features = ["derive"] } -utoipa = { version = "5.0.0-beta.0", features = ["axum_extras"] } -utoipa-redoc = { version = "4.0.1-beta.0", features = ["axum"] } -sea-orm = { version = "1.1.0-rc.1", features = ["sqlx-sqlite", "runtime-tokio-rustls", "macros"] } +#sea-orm = { version = "1.1.0-rc.1", features = ["sqlx-sqlite", "runtime-tokio-rustls", "macros"] } uuid = { version = "1.10.0", features = ["serde"] } serde_json = "1.0.128" reqwest = { version = "0.12.7", features = ["json"] } async-trait = "0.1.82" url = "2.5.2" -erased-serde = "0.4.5" +figment = { version = "0.10.19", features = ["yaml", "env"] } +clap = { version = "4.5.17", features = ["derive", "color", "usage"] } +notifico-core = { path = "notifico-core" } +notifico-telegram = { path = "notifico-telegram" } [workspace] -members = ["."] +members = [".", "notifico-core", "notifico-telegram"] diff --git a/notifico-core/Cargo.toml b/notifico-core/Cargo.toml new file mode 100644 index 0000000..7b8b0fa --- /dev/null +++ b/notifico-core/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "notifico-core" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1.82" +serde = { version = "1.0.210", features = ["derive"] } +serde_json = "1.0.128" +uuid = { version = "1.10.0", features = ["v4", "serde"] } +reqwest = "0.12.7" +url = "2.5.2" diff --git a/notifico-core/src/credentials.rs b/notifico-core/src/credentials.rs new file mode 100644 index 0000000..2710832 --- /dev/null +++ b/notifico-core/src/credentials.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Credential { + pub r#type: String, + pub name: String, + pub value: Value, +} + +pub trait Credentials: Send + Sync { + fn get_credential(&self, r#type: &str, name: &str) -> Option; +} diff --git a/notifico-core/src/engine.rs b/notifico-core/src/engine.rs new file mode 100644 index 0000000..95a8d12 --- /dev/null +++ b/notifico-core/src/engine.rs @@ -0,0 +1,38 @@ +use crate::pipeline::SerializedStep; +use crate::recipient::Recipient; +use crate::templater::TemplaterError; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use std::any::Any; +use std::borrow::Cow; +use std::collections::HashMap; + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(transparent)] +pub struct EventContext(pub Map); + +#[derive(Default, Debug)] +pub struct PipelineContext { + pub recipient: Option, + pub event_context: EventContext, + pub plugin_contexts: HashMap, Value>, +} + +#[derive(Debug)] +pub enum EngineError { + TemplaterError(TemplaterError), + PluginNotFound(SerializedStep), + PipelineInterrupted, +} + +#[async_trait] +pub trait EnginePlugin: Send + Sync + Any { + async fn execute_step( + &self, + context: &mut PipelineContext, + step: &SerializedStep, + ) -> Result<(), EngineError>; + + fn step_type(&self) -> Cow<'static, str>; +} diff --git a/notifico-core/src/lib.rs b/notifico-core/src/lib.rs new file mode 100644 index 0000000..a44d7d0 --- /dev/null +++ b/notifico-core/src/lib.rs @@ -0,0 +1,5 @@ +pub mod credentials; +pub mod engine; +pub mod pipeline; +pub mod recipient; +pub mod templater; diff --git a/src/engine/pipeline.rs b/notifico-core/src/pipeline.rs similarity index 55% rename from src/engine/pipeline.rs rename to notifico-core/src/pipeline.rs index bea6153..8f01bd5 100644 --- a/src/engine/pipeline.rs +++ b/notifico-core/src/pipeline.rs @@ -3,15 +3,24 @@ use serde_json::Value; #[derive(Serialize, Deserialize, Default, Clone, Debug)] pub struct Pipeline { - pub channel: String, + pub events: Vec, pub steps: Vec, } #[derive(Serialize, Deserialize, Default, Clone, Debug)] +#[serde(transparent)] pub struct SerializedStep(pub serde_json::Map); impl SerializedStep { pub fn get_type(&self) -> &str { - &self.0["type"].as_str().expect("Step type must be a string") + self.0["step"].as_str().expect("Step type must be a string") + } + + pub fn into_inner(self) -> serde_json::Map { + self.0 + } + + pub fn into_value(self) -> Value { + Value::Object(self.into_inner()) } } diff --git a/notifico-core/src/recipient.rs b/notifico-core/src/recipient.rs new file mode 100644 index 0000000..f97f4f2 --- /dev/null +++ b/notifico-core/src/recipient.rs @@ -0,0 +1,35 @@ +use serde::Deserialize; +use serde_json::Value; +use uuid::Uuid; + +#[derive(Debug, Clone, Deserialize)] +pub struct Recipient { + pub id: Uuid, + pub contacts: Vec, +} + +impl Recipient { + pub fn get_primary_contact(&self, r#type: &str) -> Option<&Contact> { + for contact in &self.contacts { + if contact.r#type() == r#type { + return Some(&contact); + } + } + None + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Contact(Value); + +impl Contact { + pub fn r#type(&self) -> &str { + self.0["type"] + .as_str() + .expect("Contact type must be a string") + } + + pub fn into_json(self) -> Value { + self.0 + } +} diff --git a/notifico-core/src/templater.rs b/notifico-core/src/templater.rs new file mode 100644 index 0000000..b94549f --- /dev/null +++ b/notifico-core/src/templater.rs @@ -0,0 +1,36 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use uuid::Uuid; + +#[derive(Debug)] +pub enum TemplaterError { + RequestError(reqwest::Error), + UrlError(url::ParseError), +} + +impl From for TemplaterError { + fn from(err: reqwest::Error) -> Self { + TemplaterError::RequestError(err) + } +} + +impl From for TemplaterError { + fn from(err: url::ParseError) -> Self { + TemplaterError::UrlError(err) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct RenderResponse(pub Map); + +#[async_trait] +pub trait Templater: Send + Sync { + async fn render( + &self, + template_type: &str, + template_id: Uuid, + context: Map, + ) -> Result; +} diff --git a/notifico-telegram/Cargo.toml b/notifico-telegram/Cargo.toml new file mode 100644 index 0000000..4107c0c --- /dev/null +++ b/notifico-telegram/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "notifico-telegram" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1.82" +serde = { version = "1.0.210", features = ["derive"] } +serde_json = "1.0.128" +tracing = "0.1.40" +uuid = { version = "1.10.0", features = ["v4"] } +notifico-core = { path = "../notifico-core" } +teloxide = "0.13.0" + diff --git a/notifico-telegram/src/contact.rs b/notifico-telegram/src/contact.rs new file mode 100644 index 0000000..032ad54 --- /dev/null +++ b/notifico-telegram/src/contact.rs @@ -0,0 +1,23 @@ +use notifico_core::recipient::Contact; +use serde::Deserialize; +use teloxide::prelude::ChatId; +use teloxide::types::Recipient; + +#[derive(Debug, Deserialize)] +pub struct TelegramContact { + chat_id: ChatId, +} + +impl TelegramContact { + pub(crate) fn into_recipient(self) -> Recipient { + Recipient::Id(self.chat_id) + } +} + +impl TryFrom for TelegramContact { + type Error = (); + + fn try_from(value: Contact) -> Result { + serde_json::from_value(value.into_json()).map_err(|_| ()) + } +} diff --git a/notifico-telegram/src/lib.rs b/notifico-telegram/src/lib.rs new file mode 100644 index 0000000..0a4d9f4 --- /dev/null +++ b/notifico-telegram/src/lib.rs @@ -0,0 +1,130 @@ +use async_trait::async_trait; +use contact::TelegramContact; +use notifico_core::credentials::Credentials; +use notifico_core::engine::{EngineError, EnginePlugin, PipelineContext}; +use notifico_core::pipeline::SerializedStep; +use notifico_core::templater::{RenderResponse, Templater}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::borrow::Cow; +use std::sync::Arc; +use step::{CredentialSelector, TelegramStep}; +use teloxide::prelude::Requester; +use teloxide::Bot; +use tracing::debug; +use uuid::Uuid; + +mod contact; +mod step; + +#[derive(Debug, Serialize, Deserialize)] +pub struct TelegramBotCredentials { + token: String, +} + +pub struct TelegramPlugin { + templater: Arc, + credentials: Arc, +} + +impl TelegramPlugin { + pub fn new(templater: Arc, credentials: Arc) -> Self { + Self { + templater, + credentials, + } + } +} + +#[derive(Default, Serialize, Deserialize)] +struct TelegramContext { + template_id: Option, +} + +#[async_trait] +impl EnginePlugin for TelegramPlugin { + async fn execute_step( + &self, + context: &mut PipelineContext, + step: &SerializedStep, + ) -> Result<(), EngineError> { + let telegram_context = context + .plugin_contexts + .entry("telegram".into()) + .or_insert(Value::Object(Default::default())); + + debug!("Plugin context: {:?}", telegram_context); + + let mut plugin_context: TelegramContext = + serde_json::from_value(telegram_context.clone()).unwrap(); + let telegram_step: TelegramStep = step.clone().try_into().unwrap(); + + match telegram_step { + TelegramStep::LoadTemplate { template_id } => { + plugin_context.template_id = Some(template_id); + context.plugin_contexts.insert( + "telegram".into(), + serde_json::to_value(plugin_context).unwrap(), + ); + } + TelegramStep::Send(cred_selector) => { + let Some(template_id) = plugin_context.template_id else { + return Err(EngineError::PipelineInterrupted); + }; + + let bot_token = match cred_selector { + CredentialSelector::BotName { bot_name } => self + .credentials + .get_credential("telegram_token", &bot_name) + .unwrap(), + }; + + let tgcred: TelegramBotCredentials = serde_json::from_value(bot_token).unwrap(); + + let bot = Bot::new(tgcred.token); + + let rendered_template = self + .templater + .render("telegram", template_id, context.event_context.0.clone()) + .await + .unwrap(); + + let rendered_template: TelegramBody = rendered_template.try_into().unwrap(); + + let contact = TelegramContact::try_from( + context + .recipient + .clone() + .unwrap() + .get_primary_contact("telegram") + .ok_or(EngineError::PipelineInterrupted) + .cloned()?, + ) + .unwrap(); + + bot.send_message(contact.into_recipient(), rendered_template.body) + .await + .unwrap(); + } + } + + Ok(()) + } + + fn step_type(&self) -> Cow<'static, str> { + "telegram".into() + } +} + +#[derive(Deserialize, Clone)] +pub struct TelegramBody { + pub body: String, +} + +impl TryFrom for TelegramBody { + type Error = (); + + fn try_from(value: RenderResponse) -> Result { + serde_json::from_value(Value::from_iter(value.0)).map_err(|_| ()) + } +} diff --git a/notifico-telegram/src/step.rs b/notifico-telegram/src/step.rs new file mode 100644 index 0000000..6bb7fdc --- /dev/null +++ b/notifico-telegram/src/step.rs @@ -0,0 +1,30 @@ +use notifico_core::pipeline::SerializedStep; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum CredentialSelector { + BotName { bot_name: String }, +} + +#[derive(Serialize, Deserialize)] +#[serde(tag = "step")] +pub enum TelegramStep { + #[serde(rename = "telegram.load_template")] + LoadTemplate { template_id: Uuid }, + // #[serde(rename = "telegram.set_recipients")] + // SetRecipients { telegram_id: Vec }, + #[serde(rename = "telegram.send")] + Send(CredentialSelector), +} + +impl TryFrom for TelegramStep { + type Error = (); + + fn try_from(value: SerializedStep) -> Result { + let s = serde_json::to_string(&value.into_value()).unwrap(); + + Ok(serde_json::from_str(&s).unwrap()) + } +} diff --git a/notifico.yml b/notifico.yml new file mode 100644 index 0000000..35633ad --- /dev/null +++ b/notifico.yml @@ -0,0 +1,37 @@ +project: "magicup" + +pipelines: + - events: + - send_notification + steps: + # - step: common.subscription_check + - step: telegram.load_template + template_id: "0191d395-f806-7b54-b4db-feffbbe5d2d4" + - step: telegram.send + bot_name: telegram_bot_1 +# - steps: +# - step: common.subscription_check +# - step: email.load_template +# template_id: "275c4ac9-014f-41b8-bf93-047e74421b34" +# - step: email.send +# smtp: +# name: mailman_1 +credentials: + - type: telegram_token + name: telegram_bot_1 + value: + token: "7488126039:AAG9HCfywfyZHkYwB_bWuE6jeeDFTHuvFpM" + - type: smtp_server + name: mailman_1 + value: + host: "your-smtp-host" + port: "your-smtp-port" + username: "your-smtp-username" + password: "your-smtp-password" +recipients: + - id: 3766b9e9-a700-4c75-a9c9-88117af11767 + contacts: + - type: telegram + chat_id: 111579711 + - type: mobile_phone + number: "+1234567890" diff --git a/pipeline.yml b/pipeline.yml deleted file mode 100644 index 57f3ded..0000000 --- a/pipeline.yml +++ /dev/null @@ -1,24 +0,0 @@ -project: "magicup" -events: - - name: registration - pipelines: - - steps: - - step: telegram.load_template - template_id: "0191d395-f806-7b54-b4db-feffbbe5d2d4" - - step: telegram.send - bot_token: "" -# - type: "email" -# steps: -# - step: "load-template" -# template_id: "email-template-id" -# - step: "send" -# server_id: "smtp-server-id" -# - type: "email" -# steps: -# - step: "hold" -# timer_name: "registration_idle" -# timeout: "2d" -# - step: "load-template" -# template_id: "email-template-id" -# - step: "send" -# server_id: "smtp-server-id" diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..0ba364c --- /dev/null +++ b/src/config.rs @@ -0,0 +1,34 @@ +use notifico_core::credentials::{Credential, Credentials}; +use notifico_core::pipeline::Pipeline; +use notifico_core::recipient::Recipient; +use serde::Deserialize; +use serde_json::Value; + +#[derive(Debug, Deserialize)] +pub(crate) struct Config { + pub project: String, + pub pipelines: Vec, + pub credentials: Vec, + pub recipients: Vec, +} + +pub struct SimpleCredentials { + creds: Vec, +} + +impl SimpleCredentials { + pub fn new(creds: Vec) -> Self { + SimpleCredentials { creds } + } +} + +impl Credentials for SimpleCredentials { + fn get_credential(&self, r#type: &str, name: &str) -> Option { + for cred in self.creds.iter() { + if cred.r#type == r#type && cred.name == name { + return Some(cred.value.clone()); + } + } + None + } +} diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 6f6442f..094e917 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -1,16 +1,16 @@ -use crate::engine::settings::Settings; use async_trait::async_trait; +use notifico_core::engine::{EngineError, EnginePlugin, PipelineContext}; +use notifico_core::pipeline::{Pipeline, SerializedStep}; +use notifico_core::recipient::Recipient; +use notifico_core::templater::TemplaterError; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::any::Any; use std::borrow::Cow; use std::collections::HashMap; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; - -pub mod settings; -pub mod telegram; -pub mod templater; +use tracing::instrument; #[derive(Serialize, Deserialize)] pub struct Event { @@ -18,35 +18,17 @@ pub struct Event { pub pipelines: Vec, } -#[derive(Serialize, Deserialize)] -pub struct Pipeline { - pub steps: Vec, -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Step { - pub(crate) r#type: String, -} - -pub struct Recipient { - pub(crate) telegram_id: i64, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -#[serde(transparent)] -pub struct EventContext(Map); - -#[derive(Default)] -pub struct PipelineContext { - pub recipients: Vec, - pub event_context: EventContext, - pub plugin_contexts: HashMap, Value>, -} - pub struct Engine { plugins: HashMap, Arc>, } +impl Debug for Engine { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let plugins = self.plugins.keys().cloned().collect::>(); + f.write_fmt(format_args!("Engine {{ plugins: [{:?}] }}", plugins)) + } +} + impl Engine { pub fn new() -> Self { Self { @@ -58,35 +40,20 @@ impl Engine { self.plugins.insert(plugin.step_type(), Arc::new(plugin)); } + #[instrument] pub(crate) async fn execute_step( &mut self, context: &mut PipelineContext, - step_type: &str, - step: Value, + step: &SerializedStep, ) -> Result<(), EngineError> { + let step_type = step.get_type(); + for (plugin_type, plugin) in self.plugins.iter() { - if step_type.starts_with(plugin_type.as_ref()) { + if step_type.starts_with(&**plugin_type) { plugin.execute_step(context, step).await?; return Ok(()); } } - Err(EngineError::PluginNotFound(step)) + Err(EngineError::PluginNotFound(step.clone())) } } - -#[derive(Debug)] -pub enum EngineError { - TemplaterError(templater::TemplaterError), - PluginNotFound(Value), -} - -#[async_trait] -pub trait EnginePlugin: Send + Sync + Any { - async fn execute_step( - &self, - context: &mut PipelineContext, - step: Value, - ) -> Result<(), EngineError>; - - fn step_type(&self) -> Cow<'static, str>; -} diff --git a/src/engine/settings.rs b/src/engine/settings.rs deleted file mode 100644 index 3354f8b..0000000 --- a/src/engine/settings.rs +++ /dev/null @@ -1,30 +0,0 @@ -use serde_json::Value; -use std::collections::HashMap; -use uuid::Uuid; - -pub trait Settings { - fn get_credentials(&self, plugin: &str, id: Uuid) -> Option; -} - -#[derive(Default)] -pub struct HashMapSettings { - settings: HashMap<(String, Uuid), Value>, -} - -impl HashMapSettings { - pub fn new() -> Self { - Default::default() - } - - pub fn set_credentials(&mut self, plugin: &str, id: Uuid, credentials: Value) { - self.settings.insert((plugin.into(), id), credentials); - } -} - -impl Settings for HashMapSettings { - fn get_credentials(&self, plugin: &str, id: Uuid) -> Option { - self.settings.get(&(plugin.into(), id)).cloned() - } -} - -// Some(json!({ "token": "7488126039:AAG9HCfywfyZHkYwB_bWuE6jeeDFTHuvFpM" })) diff --git a/src/engine/step.rs b/src/engine/step.rs deleted file mode 100644 index 5db3bae..0000000 --- a/src/engine/step.rs +++ /dev/null @@ -1,32 +0,0 @@ -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum Step { - #[serde(rename = "telegram.load_template")] - TgLoadTemplate { template_id: Uuid }, - #[serde(rename = "telegram.send")] - TgSend { bot_token: String }, - - #[serde(rename = "email.load_template")] - EmailLoadTemplate { template_id: Uuid }, - #[serde(rename = "email.premailer")] - EmailPremailer, - #[serde(rename = "email.send")] - EmailSend { bot_token: String }, - - #[serde(rename = "slack.load_template")] - SlackLoadTemplate { template_id: Uuid }, - #[serde(rename = "slack.send")] - SlackSend { bot_token: String }, -} - -#[derive(Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum TelegramStep { - #[serde(rename = "telegram.load_template")] - TgLoadTemplate { template_id: Uuid }, - #[serde(rename = "telegram.send")] - TgSend { bot_token: String }, -} diff --git a/src/engine/telegram.rs b/src/engine/telegram.rs deleted file mode 100644 index 134ceb8..0000000 --- a/src/engine/telegram.rs +++ /dev/null @@ -1,112 +0,0 @@ -use crate::engine::templater::{RenderResponse, Templater}; -use crate::engine::{Engine, EngineError, EnginePlugin, PipelineContext}; -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::borrow::Cow; -use std::sync::Arc; -use teloxide::prelude::{ChatId, Requester}; -use teloxide::types::Recipient; -use teloxide::Bot; -use uuid::Uuid; - -pub struct TelegramBotCredentials { - token: String, -} - -pub struct TelegramPlugin { - templater: Arc, -} - -impl TelegramPlugin { - pub fn new(templater: Arc) -> Self { - Self { templater } - } -} - -#[derive(Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum TelegramStep { - #[serde(rename = "telegram.load_template")] - LoadTemplate { template_id: Uuid }, - // #[serde(rename = "telegram.set_recipients")] - // SetRecipients { telegram_id: Vec }, - #[serde(rename = "telegram.send")] - Send { bot_token: String }, -} - -#[derive(Default, Serialize, Deserialize)] -struct TelegramContext { - template_id: Uuid, -} - -#[async_trait] -impl EnginePlugin for TelegramPlugin { - async fn execute_step( - &self, - context: &mut PipelineContext, - step: Value, - ) -> Result<(), EngineError> { - let telegram_context = context - .plugin_contexts - .entry("telegram".into()) - .or_insert(Value::Object(Default::default())); - - let mut plugin_context: TelegramContext = - serde_json::from_value(telegram_context.clone()).unwrap(); - let telegram_step: TelegramStep = serde_json::from_value(step).unwrap(); - - match telegram_step { - TelegramStep::LoadTemplate { template_id } => { - plugin_context.template_id = template_id; - context.plugin_contexts.insert( - "telegram".into(), - serde_json::to_value(plugin_context).unwrap(), - ); - } - TelegramStep::Send { bot_token } => { - let bot = Bot::new(bot_token); - - for recipient in context.recipients.iter() { - let rendered_template = self - .templater - .render( - "telegram", - plugin_context.template_id, - context.event_context.0.clone(), - ) - .await - .unwrap(); - - let rendered_template: TelegramTemplate = rendered_template.try_into().unwrap(); - - bot.send_message( - Recipient::Id(ChatId(recipient.telegram_id)), - rendered_template.clone().body, - ) - .await - .unwrap(); - } - } - } - - Ok(()) - } - - fn step_type(&self) -> Cow<'static, str> { - "telegram".into() - } -} - -#[derive(Deserialize, Clone)] -pub struct TelegramTemplate { - pub body: String, -} - -impl TryFrom for TelegramTemplate { - type Error = (); - - fn try_from(value: RenderResponse) -> Result { - serde_json::from_value(Value::from_iter(value.0)).map_err(|_| ()) - } -} diff --git a/src/main.rs b/src/main.rs index 2d48574..95ba780 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,21 @@ +mod config; pub mod engine; - -use crate::engine::templater::TemplaterService; -use crate::engine::{Engine, EventContext, PipelineContext, Recipient, Step}; -use axum::http::StatusCode; -use axum::routing::{get, post}; -use axum::{Json, Router}; -use engine::telegram::TelegramPlugin; -use serde::{Deserialize, Serialize}; -use serde_json::json; +pub mod recipients; +pub mod templater; + +use crate::config::{Config, SimpleCredentials}; +use crate::engine::Engine; +use figment::{ + providers::{Env, Format, Yaml}, + Figment, +}; +use notifico_core::engine::{EventContext, PipelineContext}; +use notifico_telegram::TelegramPlugin; +use std::env::args; use std::sync::Arc; -use tracing::info; -use utoipa::OpenApi; -use utoipa_redoc::{Redoc, Servable}; -use uuid::Uuid; +use templater::service::TemplaterService; +use tracing::{debug, info}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[tokio::main] async fn main() { @@ -20,85 +23,43 @@ async fn main() { std::env::set_var("RUST_LOG", "info"); } - tracing_subscriber::fmt::init(); - - #[derive(OpenApi)] - #[openapi()] - struct ApiDoc; + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); - // build our application with a route - let app = Router::new() - .merge(Redoc::with_url("/redoc", ApiDoc::openapi())) - // `GET /` goes to `root` - .route("/", get(root)) - // `POST /users` goes to `create_user` - .route("/trigger", post(trigger)); + let config: Config = Figment::new() + .merge(Yaml::file("notifico.yml")) + .merge(Env::prefixed("NOTIFICO_")) + .extract() + .unwrap(); - // run our app with hyper, listening globally on port 3000 - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); - axum::serve(listener, app).await.unwrap(); + debug!("Config: {:?}", config); - futures::future::pending::<()>().await; -} + let event_context = args() + .nth(1) + .expect("Please provide an event context as the first argument"); + let event_context: EventContext = + serde_json::from_str(&event_context).expect("Failed to parse event context"); -async fn root() -> &'static str { - "Hello, World!" -} + info!("Received context: {:?}", event_context); -// the input to our `create_user` handler -#[derive(Deserialize)] -struct TriggerSchema { - id: Uuid, - event_id: Uuid, - context: EventContext, -} + let templater = Arc::new(TemplaterService::new("http://127.0.0.1:8000")); + let credentials = Arc::new(SimpleCredentials::new(config.credentials.clone())); -// the output to our `create_user` handler -#[derive(Serialize)] -struct TriggerResult { - id: Uuid, -} + let mut engine = Engine::new(); -async fn trigger(Json(payload): Json) -> (StatusCode, Json) { - // // // // // // // // // // // // // // // // // // // // // // // // // // // // - info!("Received context: {:?}", payload.context); - - let pipeline = json!({ - "steps": [ - { - "type": "telegram.load_template", - "template_id": "0191d395-f806-7b54-b4db-feffbbe5d2d4" - }, - { - "type": "telegram.send", - "bot_token": "" - } - ] - }); + let tg_plugin = TelegramPlugin::new(templater, credentials); + engine.add_plugin(tg_plugin); // Pipeline; { - let templater = Arc::new(TemplaterService::new("http://127.0.0.1:8000")); - - let mut engine = Engine::new(); - - let tg_plugin = TelegramPlugin::new(templater); - engine.add_plugin(tg_plugin); - let mut context = PipelineContext::default(); - context.recipients = vec![Recipient { - telegram_id: 111579711i64, - }]; - context.event_context = payload.context; + context.recipient = config.recipients.get(0).cloned(); + context.event_context = event_context; - for step in pipeline["steps"].as_array().unwrap().iter() { - let step_parsed = serde_json::from_value::(step.clone()).unwrap().r#type; - engine - .execute_step(&mut context, &step_parsed, step.clone()) - .await - .unwrap() + for step in config.pipelines[0].steps.iter() { + engine.execute_step(&mut context, step).await.unwrap() } } - - (StatusCode::CREATED, Json(TriggerResult { id: payload.id })) } diff --git a/src/recipients.rs b/src/recipients.rs new file mode 100644 index 0000000..a0fc7ab --- /dev/null +++ b/src/recipients.rs @@ -0,0 +1,25 @@ +use notifico_core::recipient::Recipient; +use std::collections::HashMap; +use uuid::Uuid; + +pub trait RecipientDirectory { + fn get_recipient(&self, id: Uuid) -> Option; +} + +pub struct MemoryRecipientDirectory { + directory: HashMap, +} + +impl MemoryRecipientDirectory { + pub fn new(recipients: Vec) -> Self { + MemoryRecipientDirectory { + directory: HashMap::from_iter(recipients.into_iter().map(|r| (r.id, r))), + } + } +} + +impl RecipientDirectory for MemoryRecipientDirectory { + fn get_recipient(&self, id: Uuid) -> Option { + self.directory.get(&id).cloned() + } +} diff --git a/src/templater/mod.rs b/src/templater/mod.rs new file mode 100644 index 0000000..60f9784 --- /dev/null +++ b/src/templater/mod.rs @@ -0,0 +1 @@ +pub(crate) mod service; diff --git a/src/engine/templater.rs b/src/templater/service.rs similarity index 61% rename from src/engine/templater.rs rename to src/templater/service.rs index ca82758..0c79698 100644 --- a/src/engine/templater.rs +++ b/src/templater/service.rs @@ -1,48 +1,17 @@ use async_trait::async_trait; +use notifico_core::templater::{RenderResponse, Templater, TemplaterError}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use url::Url; use uuid::Uuid; -#[derive(Debug)] -pub enum TemplaterError { - RequestError(reqwest::Error), - UrlError(url::ParseError), -} - -impl From for TemplaterError { - fn from(err: reqwest::Error) -> Self { - TemplaterError::RequestError(err) - } -} - -impl From for TemplaterError { - fn from(err: url::ParseError) -> Self { - TemplaterError::UrlError(err) - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(transparent)] -pub struct RenderResponse(pub Map); - #[derive(Debug, Serialize, Deserialize)] struct RenderRequest { template_id: Uuid, context: Map, } -#[async_trait] -pub trait Templater: Send + Sync { - async fn render( - &self, - template_type: &str, - template_id: Uuid, - context: Map, - ) -> Result; -} - pub struct TemplaterService { client: Client, templater_baseurl: Url,