Skip to content

Commit 95d2456

Browse files
authored
Merge pull request #45 from tidepool-org/mailer
Add cloud events mailer
2 parents 30b3c3a + 4757171 commit 95d2456

File tree

3 files changed

+87
-0
lines changed

3 files changed

+87
-0
lines changed

clients/mailer.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package clients
2+
3+
import (
4+
"context"
5+
"github.com/tidepool-org/go-common/events"
6+
)
7+
8+
const MailerTopic = "emails"
9+
10+
type MailerClient interface {
11+
SendEmailTemplate(context.Context, events.SendEmailTemplateEvent) error
12+
}
13+
14+
type mailerClient struct {
15+
producer *events.KafkaCloudEventsProducer
16+
}
17+
18+
func NewMailerClient(config *events.CloudEventsConfig) (MailerClient, error) {
19+
producer, err := events.NewKafkaCloudEventsProducer(config)
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
return &mailerClient{producer}, nil
25+
}
26+
27+
func (m *mailerClient) SendEmailTemplate(ctx context.Context, event events.SendEmailTemplateEvent) error {
28+
return m.producer.Send(ctx, event)
29+
}

events/mailer.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package events
2+
3+
import cloudevents "github.com/cloudevents/sdk-go/v2"
4+
5+
const (
6+
SendEmailTemplateEventType = "email_template:send"
7+
)
8+
9+
type SendEmailTemplateEvent struct {
10+
Recipient string `json:"recipient"`
11+
Template string `json:"template"`
12+
Variables map[string]string `json:"variables"`
13+
}
14+
15+
func (s SendEmailTemplateEvent) GetEventType() string {
16+
return SendEmailTemplateEventType
17+
}
18+
19+
func (s SendEmailTemplateEvent) GetEventKey() string {
20+
return s.Recipient
21+
}
22+
23+
var _ Event = &SendEmailTemplateEvent{}
24+
25+
type EmailEventHandler interface {
26+
HandleSendEmailTemplate(payload SendEmailTemplateEvent) error
27+
}
28+
29+
type DelegatingEmailEventHandler struct {
30+
delegate EmailEventHandler
31+
}
32+
33+
func NewDelegatingEmailEventHandler(delegate EmailEventHandler) *DelegatingEmailEventHandler {
34+
return &DelegatingEmailEventHandler{delegate: delegate}
35+
}
36+
37+
func (d DelegatingEmailEventHandler) CanHandle(ce cloudevents.Event) bool {
38+
return ce.Type() == SendEmailTemplateEventType
39+
}
40+
41+
func (d DelegatingEmailEventHandler) Handle(ce cloudevents.Event) error {
42+
if ce.Type() != SendEmailTemplateEventType {
43+
// ignore invalid events
44+
return nil
45+
}
46+
47+
payload := SendEmailTemplateEvent{}
48+
if err := ce.DataAs(&payload); err != nil {
49+
return err
50+
}
51+
return d.delegate.HandleSendEmailTemplate(payload)
52+
}
53+
54+
var _ EventHandler = &DelegatingEmailEventHandler{}

events/sender.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func NewKafkaCloudEventsProducerForDeadLetters(config *CloudEventsConfig) (*Kafk
3737
}
3838

3939
func newKafkaCloudEventsProducerWithTopic(config *CloudEventsConfig, topic string) (*KafkaCloudEventsProducer, error) {
40+
// We are using a sync producer which requires setting the variables below
41+
config.SaramaConfig.Producer.Return.Errors = true
42+
config.SaramaConfig.Producer.Return.Successes = true
43+
4044
sender, err := kafka_sarama.NewSender(config.KafkaBrokers, config.SaramaConfig, topic)
4145
if err != nil {
4246
return nil, err

0 commit comments

Comments
 (0)