Skip to content

Commit

Permalink
Merge pull request #1 from joeirimpan/aws-ses
Browse files Browse the repository at this point in the history
feat: AWS SES messenger
  • Loading branch information
joeirimpan authored Jun 27, 2022
2 parents 5aadefd + b6f054d commit 4d865a1
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 21 deletions.
11 changes: 11 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
log_level="info"

[server]
address = ":8082"
read_timeout = "5s"
Expand All @@ -13,4 +15,13 @@ config = '''
"message_type": "",
"sender_id": ""
}
'''

[messenger.ses]
config = '''
{
"access_key": "",
"secret_key": "",
"region": "",
}
'''
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ go 1.17

require (
github.com/aws/aws-sdk-go v1.42.26
github.com/francoispqt/onelog v0.0.0-20190306043706-8c2bb31b10a4
github.com/go-chi/chi v1.5.4
github.com/knadh/koanf v1.4.0
github.com/knadh/listmonk v1.1.0
github.com/knadh/smtppool v0.4.0
github.com/spf13/pflag v1.0.5
)

require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmoiron/sqlx v1.2.0 // indirect
Expand Down
129 changes: 129 additions & 0 deletions go.sum

Large diffs are not rendered by default.

63 changes: 47 additions & 16 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,44 @@ package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/textproto"

"github.com/go-chi/chi"
"github.com/joeirimpan/listmonk-messenger/messenger"
"github.com/knadh/listmonk/models"
)

type postback struct {
Subject string `json:"subject"`
ContentType string `json:"content_type"`
Body string `json:"body"`
Recipients []recipient `json:"recipients"`
Campaign *campaign `json:"campaign"`
Subject string `json:"subject"`
ContentType string `json:"content_type"`
Body string `json:"body"`
Recipients []recipient `json:"recipients"`
Campaign *campaign `json:"campaign"`
Attachments []attachment `json:"attachments"`
}

type campaign struct {
UUID string `db:"uuid" json:"uuid"`
Name string `db:"name" json:"name"`
Tags []string `db:"tags" json:"tags"`
FromEmail string `json:"from_email"`
UUID string `json:"uuid"`
Name string `json:"name"`
Tags []string `json:"tags"`
}

type recipient struct {
UUID string `db:"uuid" json:"uuid"`
Email string `db:"email" json:"email"`
Name string `db:"name" json:"name"`
Attribs models.SubscriberAttribs `db:"attribs" json:"attribs"`
Status string `db:"status" json:"status"`
UUID string `json:"uuid"`
Email string `json:"email"`
Name string `json:"name"`
Attribs models.SubscriberAttribs `json:"attribs"`
Status string `json:"status"`
}

type attachment struct {
Name string `json:"name"`
Header textproto.MIMEHeader `json:"header"`
Content []byte `json:"content"`
}

type httpResp struct {
Expand All @@ -49,13 +59,15 @@ func handlePostback(w http.ResponseWriter, r *http.Request) {
// Decode body
body, err := ioutil.ReadAll(r.Body)
if err != nil {
app.logger.ErrorWith("error reading request body").Err("err", err).Write()
sendErrorResponse(w, "invalid body", http.StatusBadRequest, nil)
return
}
defer r.Body.Close()

data := &postback{}
if err := json.Unmarshal(body, &data); err != nil {
app.logger.ErrorWith("error unmarshalling request body").Err("err", err).Write()
sendErrorResponse(w, "invalid body", http.StatusBadRequest, nil)
return
}
Expand Down Expand Up @@ -88,19 +100,38 @@ func handlePostback(w http.ResponseWriter, r *http.Request) {

if data.Campaign != nil {
message.Campaign = &models.Campaign{
UUID: data.Campaign.UUID,
Name: data.Campaign.Name,
Tags: data.Campaign.Tags,
FromEmail: data.Campaign.FromEmail,
UUID: data.Campaign.UUID,
Name: data.Campaign.Name,
Tags: data.Campaign.Tags,
}
}

if len(data.Attachments) > 0 {
a := make([]messenger.Attachment, 0, len(data.Attachments))
for i := 0; i < len(data.Attachments); i++ {
a[i] = messenger.Attachment{
Name: data.Attachments[i].Name,
Header: data.Attachments[i].Header,
Content: make([]byte, len(data.Attachments[i].Content)),
}
copy(a[i].Content, data.Attachments[i].Content)
}

message.Attachments = a
}

app.logger.DebugWith("sending message").String("provider", provider).String("message", fmt.Sprintf("%#+v", message)).Write()

// Send message.
if err := p.Push(message); err != nil {
app.logger.ErrorWith("error sending message").Err("err", err).Write()
sendErrorResponse(w, "error sending message", http.StatusInternalServerError, nil)
return
}

sendResponse(w, "OK")
return
}

// wrap is a middleware that wraps HTTP handlers and injects the "app" context.
Expand Down
22 changes: 20 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"log"
"net/http"
"os"
"time"

"github.com/francoispqt/onelog"
"github.com/go-chi/chi"
"github.com/joeirimpan/listmonk-messenger/messenger"
"github.com/knadh/koanf"
Expand All @@ -28,6 +30,8 @@ type MessengerCfg struct {
}

type App struct {
logger *onelog.Logger

messengers map[string]messenger.Messenger
}

Expand Down Expand Up @@ -82,7 +86,9 @@ func loadMessengers(msgrs []string, app *App) {
)
switch m {
case "pinpoint":
msgr, err = messenger.NewPinpoint([]byte(cfg.Config))
msgr, err = messenger.NewPinpoint([]byte(cfg.Config), app.logger)
case "ses":
msgr, err = messenger.NewAWSSES([]byte(cfg.Config), app.logger)
default:
log.Fatalf("invalid provider: %s", m)
}
Expand All @@ -97,8 +103,20 @@ func loadMessengers(msgrs []string, app *App) {
}

func main() {
logLevels := onelog.INFO | onelog.WARN | onelog.ERROR | onelog.FATAL
if ko.String("log_level") == "debug" {
logLevels |= onelog.DEBUG
}

// setup logger
l := onelog.NewContext(os.Stderr, logLevels, "p")
l.Hook(func(e onelog.Entry) {
e.String("ts", time.Now().Format(time.RFC3339Nano))
e.String("line", l.Caller(5))
})

// load messengers
app := &App{}
app := &App{logger: l}

loadMessengers(ko.Strings("msgr"), app)

Expand Down
9 changes: 6 additions & 3 deletions messenger/pinpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package messenger
import (
"encoding/json"
"fmt"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/pinpoint"
"github.com/francoispqt/onelog"
)

var (
Expand All @@ -28,6 +28,8 @@ type pinpointCfg struct {
type pinpointMessenger struct {
cfg pinpointCfg
client *pinpoint.Pinpoint

logger *onelog.Logger
}

func (p pinpointMessenger) Name() string {
Expand Down Expand Up @@ -67,7 +69,7 @@ func (p pinpointMessenger) Push(msg Message) error {

if p.cfg.Log {
for phone, result := range out.MessageResponse.Result {
log.Printf("successfully sent sms to %s: %#+v", phone, result)
p.logger.InfoWith("successfully sent sms").String("phone", phone).String("result", fmt.Sprintf("%#+v", result)).Write()
}
}

Expand All @@ -83,7 +85,7 @@ func (p pinpointMessenger) Close() error {
}

// NewPinpoint creates new instance of pinpoint
func NewPinpoint(cfg []byte) (Messenger, error) {
func NewPinpoint(cfg []byte, l *onelog.Logger) (Messenger, error) {
var c pinpointCfg
if err := json.Unmarshal(cfg, &c); err != nil {
return nil, err
Expand Down Expand Up @@ -112,5 +114,6 @@ func NewPinpoint(cfg []byte) (Messenger, error) {
return pinpointMessenger{
client: svc,
cfg: c,
logger: l,
}, nil
}
131 changes: 131 additions & 0 deletions messenger/ses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package messenger

import (
"encoding/json"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ses"
"github.com/francoispqt/onelog"
"github.com/knadh/smtppool"
)

const (
ContentTypeHTML = "html"
ContentTypePlain = "plain"
)

type sesCfg struct {
AccessKey string `json:"access_key"`
SecretKey string `json:"secret_key"`
Region string `json:"region"`
Log bool `json:"log"`
}

type sesMessenger struct {
cfg sesCfg
client *ses.SES

logger *onelog.Logger
}

func (s sesMessenger) Name() string {
return "ses"
}

// Push sends the sms through pinpoint API.
func (s sesMessenger) Push(msg Message) error {
// convert attachments to smtppool.Attachments
var files []smtppool.Attachment
if msg.Attachments != nil {
files = make([]smtppool.Attachment, 0, len(msg.Attachments))
for i := 0; i < len(msg.Attachments); i++ {
files[i] = smtppool.Attachment{
Filename: msg.Attachments[i].Name,
Header: msg.Attachments[i].Header,
Content: make([]byte, len(msg.Attachments[i].Content)),
}
copy(files[i].Content, msg.Attachments[i].Content)
}
}

email := smtppool.Email{
From: msg.Campaign.FromEmail,
Subject: msg.Subject,
Sender: msg.From,
Headers: msg.Headers,
Attachments: files,
}

switch {
case msg.ContentType == ContentTypePlain:
email.Text = msg.Body
default:
email.HTML = msg.Body
}

emailB, err := email.Bytes()
if err != nil {
return err
}

input := &ses.SendRawEmailInput{
Source: &email.From,
Destinations: []*string{&msg.Subscriber.Email},
RawMessage: &ses.RawMessage{
Data: emailB,
},
}

out, err := s.client.SendRawEmail(input)
if err != nil {
return err
}

if s.cfg.Log {
s.logger.InfoWith("successfully sent email").String("email", msg.Subscriber.Email).String("results", fmt.Sprintf("%#+v", out)).Write()
}

return nil
}

func (s sesMessenger) Flush() error {
return nil
}

func (s sesMessenger) Close() error {
return nil
}

// NewAWSSES creates new instance of pinpoint
func NewAWSSES(cfg []byte, l *onelog.Logger) (Messenger, error) {
var c sesCfg
if err := json.Unmarshal(cfg, &c); err != nil {
return nil, err
}

if c.Region == "" {
return nil, fmt.Errorf("invalid region")
}
if c.AccessKey == "" {
return nil, fmt.Errorf("invalid access_key")
}
if c.SecretKey == "" {
return nil, fmt.Errorf("invalid secret_key")
}

sess := session.Must(session.NewSession())
svc := ses.New(sess,
aws.NewConfig().
WithCredentials(credentials.NewStaticCredentials(c.AccessKey, c.SecretKey, "")).
WithRegion(c.Region),
)

return sesMessenger{
client: svc,
cfg: c,
logger: l,
}, nil
}

0 comments on commit 4d865a1

Please sign in to comment.