Skip to content

Commit

Permalink
notify: create telemetry spans when sending emails, pd, slack
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Nov 9, 2023
1 parent 6dbedc8 commit 4bb9f1b
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 50 deletions.
20 changes: 15 additions & 5 deletions internal/notify/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/telemetry"

gomail "github.com/ory/mail/v3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Email struct {
Expand Down Expand Up @@ -94,20 +97,20 @@ func setupGoMailClient(cfg *service.Email) (*gomail.Dialer, error) {
}, nil
}

func (mailer *Email) Info(msg *Message) error {
func (mailer *Email) Info(ctx context.Context, msg *Message) error {
contents, err := marshalEmail(mailer.cfg, msg)
if err != nil {
return err
}
return sendEmail(mailer.cfg, mailer.dialer, msg.Filename, contents)
return sendEmail(ctx, mailer.cfg, mailer.dialer, msg.Filename, contents)
}

func (mailer *Email) Critical(msg *Message) error {
func (mailer *Email) Critical(ctx context.Context, msg *Message) error {
contents, err := marshalEmail(mailer.cfg, msg)
if err != nil {
return err
}
return sendEmail(mailer.cfg, mailer.dialer, msg.Filename, contents)
return sendEmail(ctx, mailer.cfg, mailer.dialer, msg.Filename, contents)
}

func marshalEmail(cfg *service.Email, msg *Message) (string, error) {
Expand Down Expand Up @@ -136,7 +139,12 @@ func marshalEmail(cfg *service.Email, msg *Message) (string, error) {
return buf.String(), nil
}

func sendEmail(cfg *service.Email, dialer *gomail.Dialer, filename, body string) error {
func sendEmail(ctx context.Context, cfg *service.Email, dialer *gomail.Dialer, filename, body string) error {
_, span := telemetry.StartSpan(ctx, "notify-send-email", trace.WithAttributes(
attribute.String("achgateway.filename", filename),
))
defer span.End()

m := gomail.NewMessage()
m.SetHeader("From", cfg.From)
m.SetHeader("To", cfg.To...)
Expand All @@ -150,6 +158,8 @@ func sendEmail(cfg *service.Email, dialer *gomail.Dialer, filename, body string)

var outErr error
for tries := 1; tries <= maxRetries; tries++ {
span.AddEvent(fmt.Sprintf("attempt-%d", tries))

outErr = dialer.DialAndSend(context.Background(), m)
if outErr != nil {
if strings.Contains(outErr.Error(), "i/o timeout") {
Expand Down
4 changes: 3 additions & 1 deletion internal/notify/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package notify

import (
"context"
"fmt"
"path/filepath"
"strings"
Expand Down Expand Up @@ -45,7 +46,8 @@ func TestEmailSend(t *testing.T) {
body, err := marshalEmail(cfg, msg)
require.NoError(t, err)

if err := sendEmail(cfg, dialer, msg.Filename, body); err != nil {
ctx := context.Background()
if err := sendEmail(ctx, cfg, dialer, msg.Filename, body); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 6 additions & 6 deletions internal/notify/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ type event struct {
UploadStatus uploadStatus `json:"uploadStatus"`
}

func (s *Kafka) Info(msg *Message) error {
func (s *Kafka) Info(ctx context.Context, msg *Message) error {
event := marshalKafkaMessage(success, msg)
return s.send(event)
return s.send(ctx, event)
}

func (s *Kafka) Critical(msg *Message) error {
func (s *Kafka) Critical(ctx context.Context, msg *Message) error {
event := marshalKafkaMessage(failed, msg)
return s.send(event)
return s.send(ctx, event)
}

func marshalKafkaMessage(status uploadStatus, msg *Message) event {
Expand All @@ -67,7 +67,7 @@ func marshalKafkaMessage(status uploadStatus, msg *Message) event {
}
}

func (s *Kafka) send(evt event) error {
func (s *Kafka) send(ctx context.Context, evt event) error {
bs, err := compliance.Protect(s.cfg.Transform, models.Event{
Type: "",
Event: evt,
Expand All @@ -76,7 +76,7 @@ func (s *Kafka) send(evt event) error {
return fmt.Errorf("unable to protect notifer kafka event: %v", err)
}

return s.publisher.Send(context.Background(), &pubsub.Message{
return s.publisher.Send(ctx, &pubsub.Message{
Body: bs,
})
}
6 changes: 4 additions & 2 deletions internal/notify/mock_sender.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package notify

import "context"

type MockSender struct {
infoCalled bool
criticalCalled bool
Err error
msg *Message
}

func (s *MockSender) Info(msg *Message) error {
func (s *MockSender) Info(_ context.Context, msg *Message) error {
s.infoCalled = true
s.msg = msg
return s.Err
}

func (s *MockSender) Critical(msg *Message) error {
func (s *MockSender) Critical(_ context.Context, msg *Message) error {
s.criticalCalled = true
s.msg = msg
return s.Err
Expand Down
15 changes: 7 additions & 8 deletions internal/notify/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (ms *MultiSender) senderTypes() string {
return strings.Join(out, ", ")
}

func (ms *MultiSender) Info(msg *Message) error {
func (ms *MultiSender) Info(ctx context.Context, msg *Message) error {
var firstError error
for i := range ms.senders {
err := ms.retry(func() error {
return ms.senders[i].Info(msg)
err := ms.retry(ctx, func() error {
return ms.senders[i].Info(ctx, msg)
})
if err != nil {
ms.logger.Logf("multi-sender: Info %T: %v", ms.senders[i], err)
Expand All @@ -99,11 +99,11 @@ func (ms *MultiSender) Info(msg *Message) error {
return firstError
}

func (ms *MultiSender) Critical(msg *Message) error {
func (ms *MultiSender) Critical(ctx context.Context, msg *Message) error {
var firstError error
for i := range ms.senders {
err := ms.retry(func() error {
return ms.senders[i].Critical(msg)
err := ms.retry(ctx, func() error {
return ms.senders[i].Critical(ctx, msg)
})
if err != nil {
ms.logger.Logf("multi-sender: Critical %T: %v", ms.senders[i], err)
Expand All @@ -118,13 +118,12 @@ func (ms *MultiSender) Critical(msg *Message) error {
return firstError
}

func (ms *MultiSender) retry(f func() error) error {
func (ms *MultiSender) retry(ctx context.Context, f func() error) error {
if ms.retryConfig != nil {
backoff, err := setupBackoff(ms.retryConfig)
if err != nil {
return fmt.Errorf("retry: %v", err)
}
ctx := context.Background()
return retry.Do(ctx, backoff, func(ctx context.Context) error {
return isRetryableError(f())
})
Expand Down
15 changes: 9 additions & 6 deletions internal/notify/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package notify

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -25,13 +26,14 @@ func TestMultiSender(t *testing.T) {

msg := &Message{Direction: Upload}

require.NoError(t, sender.Info(msg))
require.NoError(t, sender.Critical(msg))
ctx := context.Background()
require.NoError(t, sender.Info(ctx, msg))
require.NoError(t, sender.Critical(ctx, msg))

sender.senders = append(sender.senders, &MockSender{})

require.NoError(t, sender.Info(msg))
require.NoError(t, sender.Critical(msg))
require.NoError(t, sender.Info(ctx, msg))
require.NoError(t, sender.Critical(ctx, msg))
}

func TestMultiSenderErr(t *testing.T) {
Expand All @@ -44,10 +46,11 @@ func TestMultiSenderErr(t *testing.T) {
},
}

ctx := context.Background()
msg := &Message{Direction: Upload}

require.Equal(t, sender.Info(msg), sendErr)
require.Equal(t, sender.Critical(msg), sendErr)
require.Equal(t, sender.Info(ctx, msg), sendErr)
require.Equal(t, sender.Critical(ctx, msg), sendErr)
}

func TestMulti__Retry(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions internal/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package notify

import (
"context"

"github.com/moov-io/ach"
)

Expand All @@ -26,6 +28,6 @@ type Message struct {
}

type Sender interface {
Info(msg *Message) error
Critical(msg *Message) error
Info(ctx context.Context, msg *Message) error
Critical(ctx context.Context, msg *Message) error
}
17 changes: 12 additions & 5 deletions internal/notify/pagerduty.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"fmt"

"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/telemetry"

"github.com/PagerDuty/go-pagerduty"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type PagerDuty struct {
Expand Down Expand Up @@ -50,12 +53,12 @@ func (pd *PagerDuty) Ping() error {
return nil
}

func (pd *PagerDuty) Info(msg *Message) error {
func (pd *PagerDuty) Info(_ context.Context, msg *Message) error {
// Skip sending Info notifications, PagerDuty is setup for critical alerts
return nil
}

func (pd *PagerDuty) Critical(msg *Message) error {
func (pd *PagerDuty) Critical(ctx context.Context, msg *Message) error {
opts := &pagerduty.CreateIncidentOptions{
Type: "incident",
Title: fmt.Sprintf("ERROR during file %s", msg.Direction),
Expand All @@ -72,11 +75,15 @@ func (pd *PagerDuty) Critical(msg *Message) error {
// Downloads don't have to such a high priority
opts.Urgency = "low"
}
return pd.createIncident(opts)
return pd.createIncident(ctx, opts)
}

func (pd *PagerDuty) createIncident(opts *pagerduty.CreateIncidentOptions) error {
ctx := context.Background()
func (pd *PagerDuty) createIncident(ctx context.Context, opts *pagerduty.CreateIncidentOptions) error {
_, span := telemetry.StartSpan(ctx, "notify-trigger-pagerduty", trace.WithAttributes(
attribute.String("achgateway.error", opts.Title),
))
defer span.End()

_, err := pd.client.CreateIncidentWithContext(ctx, pd.from, opts)
return err
}
7 changes: 5 additions & 2 deletions internal/notify/pagerduty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package notify

import (
"context"
"os"
"testing"

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/service"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -40,16 +42,17 @@ func TestPagerDuty(t *testing.T) {
}

file := ach.NewFile()
ctx := context.Background()

if err := pd.Info(&Message{
if err := pd.Info(ctx, &Message{
Direction: Download,
Filename: "20200529-140002-1.ach",
File: file,
}); err != nil {
t.Fatal(err)
}

if err := pd.Critical(&Message{
if err := pd.Critical(ctx, &Message{
Direction: Upload,
Filename: "20200529-140002-2.ach",
File: file,
Expand Down
22 changes: 16 additions & 6 deletions internal/notify/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package notify

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -14,6 +15,10 @@ import (

"github.com/moov-io/achgateway"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/telemetry"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Slack struct {
Expand All @@ -40,14 +45,14 @@ const (
failed = uploadStatus("FAILED")
)

func (s *Slack) Info(msg *Message) error {
func (s *Slack) Info(ctx context.Context, msg *Message) error {
slackMsg := marshalSlackMessage(success, msg)
return s.send(slackMsg)
return s.send(ctx, slackMsg)
}

func (s *Slack) Critical(msg *Message) error {
func (s *Slack) Critical(ctx context.Context, msg *Message) error {
slackMsg := marshalSlackMessage(failed, msg)
return s.send(slackMsg)
return s.send(ctx, slackMsg)
}

func marshalSlackMessage(status uploadStatus, msg *Message) string {
Expand Down Expand Up @@ -77,7 +82,12 @@ type webhook struct {
Text string `json:"text"`
}

func (s *Slack) send(msg string) error {
func (s *Slack) send(ctx context.Context, msg string) error {
_, span := telemetry.StartSpan(ctx, "notify-send-slack", trace.WithAttributes(
attribute.String("achgateway.message", msg),
))
defer span.End()

var body bytes.Buffer
err := json.NewEncoder(&body).Encode(&webhook{
Text: msg,
Expand All @@ -86,7 +96,7 @@ func (s *Slack) send(msg string) error {
return err
}

req, err := http.NewRequest("POST", s.webhookURL, &body)
req, err := http.NewRequestWithContext(ctx, "POST", s.webhookURL, &body)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4bb9f1b

Please sign in to comment.