diff --git a/internal/notify/email.go b/internal/notify/email.go index 4606d73..e645eef 100644 --- a/internal/notify/email.go +++ b/internal/notify/email.go @@ -18,8 +18,11 @@ import ( "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/service" + "github.com/moov-io/base/telemetry" gomail "github.com/ory/mail/v3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type Email struct { @@ -94,20 +97,20 @@ func setupGoMailClient(cfg *service.Email) (*gomail.Dialer, error) { }, nil } -func (mailer *Email) Info(msg *Message) error { +func (mailer *Email) Info(ctx context.Context, msg *Message) error { contents, err := marshalEmail(mailer.cfg, msg) if err != nil { return err } - return sendEmail(mailer.cfg, mailer.dialer, msg.Filename, contents) + return sendEmail(ctx, mailer.cfg, mailer.dialer, msg.Filename, contents) } -func (mailer *Email) Critical(msg *Message) error { +func (mailer *Email) Critical(ctx context.Context, msg *Message) error { contents, err := marshalEmail(mailer.cfg, msg) if err != nil { return err } - return sendEmail(mailer.cfg, mailer.dialer, msg.Filename, contents) + return sendEmail(ctx, mailer.cfg, mailer.dialer, msg.Filename, contents) } func marshalEmail(cfg *service.Email, msg *Message) (string, error) { @@ -136,7 +139,12 @@ func marshalEmail(cfg *service.Email, msg *Message) (string, error) { return buf.String(), nil } -func sendEmail(cfg *service.Email, dialer *gomail.Dialer, filename, body string) error { +func sendEmail(ctx context.Context, cfg *service.Email, dialer *gomail.Dialer, filename, body string) error { + _, span := telemetry.StartSpan(ctx, "notify-send-email", trace.WithAttributes( + attribute.String("achgateway.filename", filename), + )) + defer span.End() + m := gomail.NewMessage() m.SetHeader("From", cfg.From) m.SetHeader("To", cfg.To...) @@ -150,6 +158,8 @@ func sendEmail(cfg *service.Email, dialer *gomail.Dialer, filename, body string) var outErr error for tries := 1; tries <= maxRetries; tries++ { + span.AddEvent(fmt.Sprintf("attempt-%d", tries)) + outErr = dialer.DialAndSend(context.Background(), m) if outErr != nil { if strings.Contains(outErr.Error(), "i/o timeout") { diff --git a/internal/notify/email_test.go b/internal/notify/email_test.go index bc647f5..9db808b 100644 --- a/internal/notify/email_test.go +++ b/internal/notify/email_test.go @@ -5,6 +5,7 @@ package notify import ( + "context" "fmt" "path/filepath" "strings" @@ -45,7 +46,8 @@ func TestEmailSend(t *testing.T) { body, err := marshalEmail(cfg, msg) require.NoError(t, err) - if err := sendEmail(cfg, dialer, msg.Filename, body); err != nil { + ctx := context.Background() + if err := sendEmail(ctx, cfg, dialer, msg.Filename, body); err != nil { t.Fatal(err) } diff --git a/internal/notify/kafka.go b/internal/notify/kafka.go index 7f80b97..3eac2f2 100644 --- a/internal/notify/kafka.go +++ b/internal/notify/kafka.go @@ -41,14 +41,14 @@ type event struct { UploadStatus uploadStatus `json:"uploadStatus"` } -func (s *Kafka) Info(msg *Message) error { +func (s *Kafka) Info(ctx context.Context, msg *Message) error { event := marshalKafkaMessage(success, msg) - return s.send(event) + return s.send(ctx, event) } -func (s *Kafka) Critical(msg *Message) error { +func (s *Kafka) Critical(ctx context.Context, msg *Message) error { event := marshalKafkaMessage(failed, msg) - return s.send(event) + return s.send(ctx, event) } func marshalKafkaMessage(status uploadStatus, msg *Message) event { @@ -67,7 +67,7 @@ func marshalKafkaMessage(status uploadStatus, msg *Message) event { } } -func (s *Kafka) send(evt event) error { +func (s *Kafka) send(ctx context.Context, evt event) error { bs, err := compliance.Protect(s.cfg.Transform, models.Event{ Type: "", Event: evt, @@ -76,7 +76,7 @@ func (s *Kafka) send(evt event) error { return fmt.Errorf("unable to protect notifer kafka event: %v", err) } - return s.publisher.Send(context.Background(), &pubsub.Message{ + return s.publisher.Send(ctx, &pubsub.Message{ Body: bs, }) } diff --git a/internal/notify/mock_sender.go b/internal/notify/mock_sender.go index 604a12b..2e355fd 100644 --- a/internal/notify/mock_sender.go +++ b/internal/notify/mock_sender.go @@ -1,5 +1,7 @@ package notify +import "context" + type MockSender struct { infoCalled bool criticalCalled bool @@ -7,13 +9,13 @@ type MockSender struct { msg *Message } -func (s *MockSender) Info(msg *Message) error { +func (s *MockSender) Info(_ context.Context, msg *Message) error { s.infoCalled = true s.msg = msg return s.Err } -func (s *MockSender) Critical(msg *Message) error { +func (s *MockSender) Critical(_ context.Context, msg *Message) error { s.criticalCalled = true s.msg = msg return s.Err diff --git a/internal/notify/multi.go b/internal/notify/multi.go index c144f0c..849d94f 100644 --- a/internal/notify/multi.go +++ b/internal/notify/multi.go @@ -80,11 +80,11 @@ func (ms *MultiSender) senderTypes() string { return strings.Join(out, ", ") } -func (ms *MultiSender) Info(msg *Message) error { +func (ms *MultiSender) Info(ctx context.Context, msg *Message) error { var firstError error for i := range ms.senders { - err := ms.retry(func() error { - return ms.senders[i].Info(msg) + err := ms.retry(ctx, func() error { + return ms.senders[i].Info(ctx, msg) }) if err != nil { ms.logger.Logf("multi-sender: Info %T: %v", ms.senders[i], err) @@ -99,11 +99,11 @@ func (ms *MultiSender) Info(msg *Message) error { return firstError } -func (ms *MultiSender) Critical(msg *Message) error { +func (ms *MultiSender) Critical(ctx context.Context, msg *Message) error { var firstError error for i := range ms.senders { - err := ms.retry(func() error { - return ms.senders[i].Critical(msg) + err := ms.retry(ctx, func() error { + return ms.senders[i].Critical(ctx, msg) }) if err != nil { ms.logger.Logf("multi-sender: Critical %T: %v", ms.senders[i], err) @@ -118,13 +118,12 @@ func (ms *MultiSender) Critical(msg *Message) error { return firstError } -func (ms *MultiSender) retry(f func() error) error { +func (ms *MultiSender) retry(ctx context.Context, f func() error) error { if ms.retryConfig != nil { backoff, err := setupBackoff(ms.retryConfig) if err != nil { return fmt.Errorf("retry: %v", err) } - ctx := context.Background() return retry.Do(ctx, backoff, func(ctx context.Context) error { return isRetryableError(f()) }) diff --git a/internal/notify/multi_test.go b/internal/notify/multi_test.go index 894b0fa..e5076d4 100644 --- a/internal/notify/multi_test.go +++ b/internal/notify/multi_test.go @@ -5,6 +5,7 @@ package notify import ( + "context" "errors" "testing" "time" @@ -25,13 +26,14 @@ func TestMultiSender(t *testing.T) { msg := &Message{Direction: Upload} - require.NoError(t, sender.Info(msg)) - require.NoError(t, sender.Critical(msg)) + ctx := context.Background() + require.NoError(t, sender.Info(ctx, msg)) + require.NoError(t, sender.Critical(ctx, msg)) sender.senders = append(sender.senders, &MockSender{}) - require.NoError(t, sender.Info(msg)) - require.NoError(t, sender.Critical(msg)) + require.NoError(t, sender.Info(ctx, msg)) + require.NoError(t, sender.Critical(ctx, msg)) } func TestMultiSenderErr(t *testing.T) { @@ -44,10 +46,11 @@ func TestMultiSenderErr(t *testing.T) { }, } + ctx := context.Background() msg := &Message{Direction: Upload} - require.Equal(t, sender.Info(msg), sendErr) - require.Equal(t, sender.Critical(msg), sendErr) + require.Equal(t, sender.Info(ctx, msg), sendErr) + require.Equal(t, sender.Critical(ctx, msg), sendErr) } func TestMulti__Retry(t *testing.T) { diff --git a/internal/notify/notify.go b/internal/notify/notify.go index 2584f9e..70b8673 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -5,6 +5,8 @@ package notify import ( + "context" + "github.com/moov-io/ach" ) @@ -26,6 +28,6 @@ type Message struct { } type Sender interface { - Info(msg *Message) error - Critical(msg *Message) error + Info(ctx context.Context, msg *Message) error + Critical(ctx context.Context, msg *Message) error } diff --git a/internal/notify/pagerduty.go b/internal/notify/pagerduty.go index 537f71e..10edde2 100644 --- a/internal/notify/pagerduty.go +++ b/internal/notify/pagerduty.go @@ -10,8 +10,11 @@ import ( "fmt" "github.com/moov-io/achgateway/internal/service" + "github.com/moov-io/base/telemetry" "github.com/PagerDuty/go-pagerduty" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type PagerDuty struct { @@ -50,12 +53,12 @@ func (pd *PagerDuty) Ping() error { return nil } -func (pd *PagerDuty) Info(msg *Message) error { +func (pd *PagerDuty) Info(_ context.Context, msg *Message) error { // Skip sending Info notifications, PagerDuty is setup for critical alerts return nil } -func (pd *PagerDuty) Critical(msg *Message) error { +func (pd *PagerDuty) Critical(ctx context.Context, msg *Message) error { opts := &pagerduty.CreateIncidentOptions{ Type: "incident", Title: fmt.Sprintf("ERROR during file %s", msg.Direction), @@ -72,11 +75,15 @@ func (pd *PagerDuty) Critical(msg *Message) error { // Downloads don't have to such a high priority opts.Urgency = "low" } - return pd.createIncident(opts) + return pd.createIncident(ctx, opts) } -func (pd *PagerDuty) createIncident(opts *pagerduty.CreateIncidentOptions) error { - ctx := context.Background() +func (pd *PagerDuty) createIncident(ctx context.Context, opts *pagerduty.CreateIncidentOptions) error { + _, span := telemetry.StartSpan(ctx, "notify-trigger-pagerduty", trace.WithAttributes( + attribute.String("achgateway.error", opts.Title), + )) + defer span.End() + _, err := pd.client.CreateIncidentWithContext(ctx, pd.from, opts) return err } diff --git a/internal/notify/pagerduty_test.go b/internal/notify/pagerduty_test.go index cbd6b09..ce8bfe1 100644 --- a/internal/notify/pagerduty_test.go +++ b/internal/notify/pagerduty_test.go @@ -5,11 +5,13 @@ package notify import ( + "context" "os" "testing" "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/service" + "github.com/stretchr/testify/require" ) @@ -40,8 +42,9 @@ func TestPagerDuty(t *testing.T) { } file := ach.NewFile() + ctx := context.Background() - if err := pd.Info(&Message{ + if err := pd.Info(ctx, &Message{ Direction: Download, Filename: "20200529-140002-1.ach", File: file, @@ -49,7 +52,7 @@ func TestPagerDuty(t *testing.T) { t.Fatal(err) } - if err := pd.Critical(&Message{ + if err := pd.Critical(ctx, &Message{ Direction: Upload, Filename: "20200529-140002-2.ach", File: file, diff --git a/internal/notify/slack.go b/internal/notify/slack.go index 358a54e..4e4f3b3 100644 --- a/internal/notify/slack.go +++ b/internal/notify/slack.go @@ -6,6 +6,7 @@ package notify import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -14,6 +15,10 @@ import ( "github.com/moov-io/achgateway" "github.com/moov-io/achgateway/internal/service" + "github.com/moov-io/base/telemetry" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type Slack struct { @@ -40,14 +45,14 @@ const ( failed = uploadStatus("FAILED") ) -func (s *Slack) Info(msg *Message) error { +func (s *Slack) Info(ctx context.Context, msg *Message) error { slackMsg := marshalSlackMessage(success, msg) - return s.send(slackMsg) + return s.send(ctx, slackMsg) } -func (s *Slack) Critical(msg *Message) error { +func (s *Slack) Critical(ctx context.Context, msg *Message) error { slackMsg := marshalSlackMessage(failed, msg) - return s.send(slackMsg) + return s.send(ctx, slackMsg) } func marshalSlackMessage(status uploadStatus, msg *Message) string { @@ -77,7 +82,12 @@ type webhook struct { Text string `json:"text"` } -func (s *Slack) send(msg string) error { +func (s *Slack) send(ctx context.Context, msg string) error { + _, span := telemetry.StartSpan(ctx, "notify-send-slack", trace.WithAttributes( + attribute.String("achgateway.message", msg), + )) + defer span.End() + var body bytes.Buffer err := json.NewEncoder(&body).Encode(&webhook{ Text: msg, @@ -86,7 +96,7 @@ func (s *Slack) send(msg string) error { return err } - req, err := http.NewRequest("POST", s.webhookURL, &body) + req, err := http.NewRequestWithContext(ctx, "POST", s.webhookURL, &body) if err != nil { return err } diff --git a/internal/notify/slack_test.go b/internal/notify/slack_test.go index 2164319..8a923aa 100644 --- a/internal/notify/slack_test.go +++ b/internal/notify/slack_test.go @@ -6,6 +6,7 @@ package notify import ( "bytes" + "context" "io" "net/http" "net/http/httptest" @@ -14,9 +15,8 @@ import ( "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/service" - "github.com/stretchr/testify/require" - "github.com/gorilla/mux" + "github.com/stretchr/testify/require" ) func TestSlack(t *testing.T) { @@ -44,12 +44,13 @@ func TestSlack(t *testing.T) { Filename: "20200529-152259.ach", File: ach.NewFile(), } + ctx := context.Background() - if err := slack.Info(msg); err != nil { + if err := slack.Info(ctx, msg); err != nil { t.Fatal(err) } - if err := slack.Critical(msg); err != nil { + if err := slack.Critical(ctx, msg); err != nil { t.Fatal(err) } } diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index c841511..362787a 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -350,11 +350,11 @@ func (xfagg *aggregator) notifyAfterUpload(ctx context.Context, filename string, } if uploadErr != nil { - if err := notifier.Critical(msg); err != nil { + if err := notifier.Critical(ctx, msg); err != nil { return fmt.Errorf("problem sending critical notification for file=%s: %v", filename, err) } } else { - if err := notifier.Info(msg); err != nil { + if err := notifier.Info(ctx, msg); err != nil { return fmt.Errorf("problem sending info notification for file=%s: %v", filename, err) } } @@ -378,6 +378,11 @@ func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) { return } + ctx, span := telemetry.StartSpan(context.Background(), "notify-about-holiday", trace.WithAttributes( + attribute.String("achgateway.holiday", day.Holiday.Name), + )) + defer span.End() + if uploadAgent.Notifications != nil { slackConfigs := xfagg.shard.Notifications.FindSlacks(uploadAgent.Notifications.Slack) for i := range slackConfigs { @@ -387,7 +392,7 @@ func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) { continue } - err = ss.Info(¬ify.Message{ + err = ss.Info(ctx, ¬ify.Message{ Contents: formatHolidayMessage(day), }) if err != nil {