Skip to content

Commit

Permalink
Add SMPP support for sending SMS
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Oct 9, 2024
1 parent ffe827e commit d6fa20d
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 38 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ notifico-subscription = { path = "notifico-subscription" }
notifico-whatsapp = { path = "notifico-whatsapp" }
notifico-templater = { path = "notifico-templater" }
notifico-ncenter = { path = "notifico-ncenter" }
notifico-smpp = { path = "notifico-smpp" }

[profile.release]
opt-level = 3
Expand All @@ -50,7 +51,8 @@ members = [
"notifico-whatsapp",
"notifico-templater",
"notifico-ncenter",
"notifico-ncenter/migration"
"notifico-ncenter/migration",
"notifico-smpp"
]

[workspace.dependencies]
Expand Down
6 changes: 6 additions & 0 deletions notifico-core/src/recipient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ impl TypedContact for MobilePhoneContact {
const CONTACT_TYPE: &'static str = "mobile_phone";
}

impl MobilePhoneContact {
pub fn msisdn(&self) -> &str {
self.number.strip_prefix("+").unwrap_or(&self.number)
}
}

#[async_trait]
pub trait RecipientDirectory {
async fn get_recipient(&self, id: Uuid) -> Option<Recipient>;
Expand Down
16 changes: 16 additions & 0 deletions notifico-smpp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "notifico-smpp"
version = "0.1.0"
edition = "2021"

[dependencies]
rusmpp = { version = "0.1.3", features = ["tokio-codec", "tracing"] }
tokio = { version = "1.40.0", features = ["net"] }
tokio-util = { version = "0.7.12", features = ["full"] }
tracing = "0.1.40"

notifico-core = { path = "../notifico-core" }
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
async-trait = "0.1.83"
futures-util = "0.3.31"
14 changes: 14 additions & 0 deletions notifico-smpp/src/credentials.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use notifico_core::credentials::TypedCredential;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct SmppServerCredentials {
pub host: String,
pub port: u16,
pub username: String,
pub password: String,
}

impl TypedCredential for SmppServerCredentials {
const CREDENTIAL_TYPE: &'static str = "smpp_server";
}
208 changes: 208 additions & 0 deletions notifico-smpp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
mod credentials;
mod step;

use crate::credentials::SmppServerCredentials;
use crate::step::{Step, STEPS};
use async_trait::async_trait;
use futures_util::sink::SinkExt;
use futures_util::StreamExt;
use notifico_core::credentials::{get_typed_credential, Credentials};
use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput};
use notifico_core::error::EngineError;
use notifico_core::pipeline::SerializedStep;
use notifico_core::recipient::MobilePhoneContact;
use notifico_core::templater::RenderResponse;
use rusmpp::commands::tlvs::tlv::message_submission_request::MessageSubmissionRequestTLVValue;
use rusmpp::commands::types::{
DataCoding, EsmClass, InterfaceVersion, Npi, RegisteredDelivery, ServiceType, Ton,
};
use rusmpp::pdu::{Bind, SubmitSm};
use rusmpp::types::{AnyOctetString, COctetString};
use rusmpp::{
codec::command_codec::CommandCodec,
commands::{
command::Command,
pdu::Pdu,
types::{command_id::CommandId, command_status::CommandStatus},
},
TLVTag,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::borrow::Cow;
use std::str::FromStr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::debug;

pub struct SmppPlugin {
credentials: Arc<dyn Credentials>,
}

impl SmppPlugin {
pub fn new(credentials: Arc<dyn Credentials>) -> Self {
Self { credentials }
}
}

#[async_trait]
impl EnginePlugin for SmppPlugin {
async fn execute_step(
&self,
context: &mut PipelineContext,
step: &SerializedStep,
) -> Result<StepOutput, EngineError> {
let step: Step = step.clone().convert_step()?;

match step {
Step::Send { credential } => {
let Some(recipient) = context.recipient.clone() else {
return Err(EngineError::RecipientNotSet);
};

let credential: SmppServerCredentials = get_typed_credential(
self.credentials.as_ref(),
context.project_id,
&credential,
)
.await?;

let stream = TcpStream::connect((credential.host.clone(), credential.port))
.await
.unwrap();

let (reader, writer) = stream.into_split();
let mut framed_read = FramedRead::new(reader, CommandCodec {});
let mut framed_write = FramedWrite::new(writer, CommandCodec {});

// Build commands. Omitted values will be set to default.
let bind_transceiver_command = Command::new(
CommandStatus::EsmeRok,
1,
Bind::builder()
.system_id(COctetString::from_str(&credential.username).unwrap())
.password(COctetString::from_str(&credential.password).unwrap())
.system_type(COctetString::empty())
.interface_version(InterfaceVersion::Smpp5_0)
.addr_ton(Ton::Unknown)
.addr_npi(Npi::Unknown)
.address_range(COctetString::empty())
.build()
.into_bind_transceiver(),
);

// Send commands.
framed_write.send(&bind_transceiver_command).await.unwrap();

// Wait for responses.
while let Some(Ok(command)) = framed_read.next().await {
if let Some(Pdu::BindTransceiverResp(_)) = command.pdu() {
debug!("BindTransceiverResp received.");

if let CommandStatus::EsmeRok = command.command_status {
debug!("Successful bind.");
break;
}
}
}

let contact: MobilePhoneContact = recipient.get_primary_contact()?;

for message in context.messages.iter().cloned() {
let rendered: SmsContent = message.try_into().unwrap();

let payload: Vec<u8> = rendered
.body
.encode_utf16()
.map(|c| c.to_be_bytes())
.flatten()
.collect();

let submit_sm_command = Command::new(
CommandStatus::EsmeRok,
2,
SubmitSm::builder()
.serivce_type(ServiceType::default())
.source_addr_ton(Ton::Unknown)
.source_addr_npi(Npi::Unknown)
.source_addr(COctetString::from_str(&rendered.source_address).unwrap())
.destination_addr(COctetString::from_str(contact.msisdn()).unwrap())
.esm_class(EsmClass::default())
.registered_delivery(RegisteredDelivery::request_all())
.data_coding(DataCoding::Ucs2)
.push_tlv(
MessageSubmissionRequestTLVValue::MessagePayload(
AnyOctetString::new(&payload),
)
.into(),
)
.build()
.into_submit_sm(),
);

framed_write.send(&submit_sm_command).await.unwrap();

'outer: while let Some(Ok(command)) = framed_read.next().await {
match command.pdu() {
Some(Pdu::SubmitSmResp(_)) => {
debug!("SubmitSmResp received.");

if let CommandStatus::EsmeRok = command.command_status {
debug!("Successful submit.");
}
}
Some(Pdu::DeliverSm(deliver_sm)) => {
debug!("DeliverSm received.");

for tlv in deliver_sm.tlvs().iter() {
if let TLVTag::ReceiptedMessageId = tlv.tag() {
debug!("Delivery receipt received.");

break 'outer;
}
}
}
_ => {}
}
}
}

let unbind_command = Command::new(CommandStatus::EsmeRok, 3, Pdu::Unbind);

framed_write.send(&unbind_command).await.unwrap();

while let Some(Ok(command)) = framed_read.next().await {
if let CommandId::UnbindResp = command.command_id() {
debug!("UnbindResp received.");

if let CommandStatus::EsmeRok = command.command_status {
debug!("Successful unbind.");
break;
}
}
}
}
}

Ok(StepOutput::Continue)
}

fn steps(&self) -> Vec<Cow<'static, str>> {
STEPS.iter().map(|&s| Cow::from(s)).collect()
}
}

#[derive(Serialize, Deserialize, Clone)]
pub struct SmsContent {
pub body: String,
pub source_address: String,
}

impl TryFrom<RenderResponse> for SmsContent {
type Error = ();

fn try_from(value: RenderResponse) -> Result<Self, Self::Error> {
serde_json::from_value(Value::from_iter(value.0)).map_err(|_| ())
}
}
10 changes: 10 additions & 0 deletions notifico-smpp/src/step.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[serde(tag = "step")]
pub enum Step {
#[serde(rename = "smpp.send")]
Send { credential: String },
}

pub(crate) const STEPS: &[&str] = &["smpp.send"];
37 changes: 0 additions & 37 deletions notifico.yml

This file was deleted.

2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use figment::{
use hmac::{Hmac, Mac};
use notifico_core::engine::Engine;
use notifico_ncenter::NCenterPlugin;
use notifico_smpp::SmppPlugin;
use notifico_smtp::EmailPlugin;
use notifico_subscription::SubscriptionManager;
use notifico_telegram::TelegramPlugin;
Expand Down Expand Up @@ -85,6 +86,7 @@ async fn main() {
engine.add_plugin(sub_manager.clone());
engine.add_plugin(Arc::new(WaBusinessPlugin::new(credentials.clone())));
engine.add_plugin(ncenter.clone());
engine.add_plugin(Arc::new(SmppPlugin::new(credentials.clone())));

let event_handler = EventHandler {
pipeline_storage: pipelines.clone(),
Expand Down

0 comments on commit d6fa20d

Please sign in to comment.