Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/route"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {

// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
func (api *API) Update(cfg *config.Config, setAlertStatus func(ctx context.Context, labels model.LabelSet)) {
api.v2.Update(cfg, setAlertStatus)
}

Expand Down Expand Up @@ -242,7 +243,7 @@ func (api *API) instrumentHandler(prefix string, h http.Handler) http.Handler {
}
promhttp.InstrumentHandlerDuration(
api.requestDuration.MustCurryWith(prometheus.Labels{"handler": path}),
h,
otelhttp.WithRouteTag(path, h),
).ServeHTTP(w, r)
})
}
64 changes: 50 additions & 14 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
prometheus_model "github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"github.com/rs/cors"
"go.opentelemetry.io/otel"

"github.com/prometheus/alertmanager/api/metrics"
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
Expand All @@ -54,6 +55,8 @@ import (
"github.com/prometheus/alertmanager/types"
)

var tracer = otel.Tracer("github.com/prometheus/alertmanager/api/v2")

// API represents an Alertmanager API v2.
type API struct {
peer cluster.ClusterPeer
Expand Down Expand Up @@ -82,7 +85,7 @@ type (
groupsFn func(context.Context, func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[prometheus_model.Fingerprint][]string, error)
groupMutedFunc func(routeID, groupKey string) ([]string, bool)
getAlertStatusFn func(prometheus_model.Fingerprint) types.AlertStatus
setAlertStatusFn func(prometheus_model.LabelSet)
setAlertStatusFn func(ctx context.Context, labels prometheus_model.LabelSet)
)

// NewAPI returns a new Alertmanager API v2.
Expand Down Expand Up @@ -173,6 +176,9 @@ func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.
api.mtx.RLock()
defer api.mtx.RUnlock()

_, span := tracer.Start(params.HTTPRequest.Context(), "api.getStatusHandler")
defer span.End()

original := api.alertmanagerConfig.String()
uptime := strfmt.DateTime(api.uptime)

Expand Down Expand Up @@ -229,6 +235,9 @@ func (api *API) getReceiversHandler(params receiver_ops.GetReceiversParams) midd
api.mtx.RLock()
defer api.mtx.RUnlock()

_, span := tracer.Start(params.HTTPRequest.Context(), "api.getReceiversHandler")
defer span.End()

receivers := make([]*open_api_models.Receiver, 0, len(api.alertmanagerConfig.Receivers))
for i := range api.alertmanagerConfig.Receivers {
receivers = append(receivers, &open_api_models.Receiver{Name: &api.alertmanagerConfig.Receivers[i].Name})
Expand All @@ -243,11 +252,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
// Initialize result slice to prevent api returning `null` when there
// are no alerts present
res = open_api_models.GettableAlerts{}
ctx = params.HTTPRequest.Context()

logger = api.requestLogger(params.HTTPRequest)
)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getAlertsHandler")
defer span.End()

matchers, err := parseFilter(params.Filter)
if err != nil {
logger.Debug("Failed to parse matchers", "err", err)
Expand All @@ -274,14 +285,15 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re

api.mtx.RLock()
for a := range alerts.Next() {
alert := a.Data
if err = alerts.Err(); err != nil {
break
}
if err = ctx.Err(); err != nil {
break
}

routes := api.route.Match(a.Labels)
routes := api.route.Match(alert.Labels)
receivers := make([]string, 0, len(routes))
for _, r := range routes {
receivers = append(receivers, r.RouteOpts.Receiver)
Expand All @@ -291,13 +303,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
continue
}

if !alertFilter(a, now) {
if !alertFilter(alert, now) {
continue
}

alert := AlertToOpenAPIAlert(a, api.getAlertStatus(a.Fingerprint()), receivers, nil)
openAlert := AlertToOpenAPIAlert(alert, api.getAlertStatus(alert.Fingerprint()), receivers, nil)

res = append(res, alert)
res = append(res, openAlert)
}
api.mtx.RUnlock()

Expand All @@ -315,7 +327,11 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

alerts := OpenAPIAlertsToAlerts(params.Alerts)
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postAlertsHandler")
defer span.End()

alerts := OpenAPIAlertsToAlerts(ctx, params.Alerts)

now := time.Now()

api.mtx.RLock()
Expand Down Expand Up @@ -361,13 +377,15 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
if err := api.alerts.Put(ctx, validAlerts...); err != nil {
logger.Error("Failed to create alerts", "err", err)
span.RecordError(err)
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}

if validationErrs.Len() > 0 {
logger.Error("Failed to validate alerts", "err", validationErrs.Error())
span.RecordError(validationErrs)
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}

Expand All @@ -377,6 +395,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getAlertGroupsHandler")
defer span.End()

matchers, err := parseFilter(params.Filter)
if err != nil {
logger.Debug("Failed to parse matchers", "err", err)
Expand Down Expand Up @@ -407,7 +428,7 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams
}(receiverFilter)

af := api.alertFilter(matchers, *params.Silenced, *params.Inhibited, *params.Active)
alertGroups, allReceivers, err := api.alertGroups(params.HTTPRequest.Context(), rf, af)
alertGroups, allReceivers, err := api.alertGroups(ctx, rf, af)
if err != nil {
return alertgroup_ops.NewGetAlertGroupsInternalServerError()
}
Expand Down Expand Up @@ -441,12 +462,15 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams

func (api *API) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool {
return func(a *types.Alert, now time.Time) bool {
ctx, span := tracer.Start(context.Background(), "alertFilter")
defer span.End()

if !a.EndsAt.IsZero() && a.EndsAt.Before(now) {
return false
}

// Set alert's current status based on its label set.
api.setAlertStatus(a.Labels)
api.setAlertStatus(ctx, a.Labels)

// Get alert's current status after seeing if it is suppressed.
status := api.getAlertStatus(a.Fingerprint())
Expand Down Expand Up @@ -510,13 +534,16 @@ func matchFilterLabels(matchers []*labels.Matcher, sms map[string]string) bool {
func (api *API) getSilencesHandler(params silence_ops.GetSilencesParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getSilencesHandler")
defer span.End()

matchers, err := parseFilter(params.Filter)
if err != nil {
logger.Debug("Failed to parse matchers", "err", err)
return silence_ops.NewGetSilencesBadRequest().WithPayload(err.Error())
}

psils, _, err := api.silences.Query()
psils, _, err := api.silences.Query(ctx)
if err != nil {
logger.Error("Failed to get silences", "err", err)
return silence_ops.NewGetSilencesInternalServerError().WithPayload(err.Error())
Expand Down Expand Up @@ -606,7 +633,10 @@ func CheckSilenceMatchesFilterLabels(s *silencepb.Silence, matchers []*labels.Ma
func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

sils, _, err := api.silences.Query(silence.QIDs(params.SilenceID.String()))
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getSilenceHandler")
defer span.End()

sils, _, err := api.silences.Query(ctx, silence.QIDs(params.SilenceID.String()))
if err != nil {
logger.Error("Failed to get silence by id", "err", err, "id", params.SilenceID.String())
return silence_ops.NewGetSilenceInternalServerError().WithPayload(err.Error())
Expand All @@ -629,8 +659,11 @@ func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middlewar
func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.deleteSilenceHandler")
defer span.End()

sid := params.SilenceID.String()
if err := api.silences.Expire(sid); err != nil {
if err := api.silences.Expire(ctx, sid); err != nil {
logger.Error("Failed to expire silence", "err", err)
if errors.Is(err, silence.ErrNotFound) {
return silence_ops.NewDeleteSilenceNotFound()
Expand All @@ -643,6 +676,9 @@ func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) mid
func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postSilencesHandler")
defer span.End()

sil, err := PostableSilenceToProto(params.Silence)
if err != nil {
logger.Error("Failed to marshal silence to proto", "err", err)
Expand All @@ -663,7 +699,7 @@ func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middl
return silence_ops.NewPostSilencesBadRequest().WithPayload(msg)
}

if err = api.silences.Set(sil); err != nil {
if err = api.silences.Set(ctx, sil); err != nil {
logger.Error("Failed to create silence", "err", err)
if errors.Is(err, silence.ErrNotFound) {
return silence_ops.NewPostSilencesNotFound().WithPayload(err.Error())
Expand Down
12 changes: 6 additions & 6 deletions api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ func TestDeleteSilenceHandler(t *testing.T) {
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
}
require.NoError(t, silences.Set(unexpiredSil))
require.NoError(t, silences.Set(t.Context(), unexpiredSil))

expiredSil := &silencepb.Silence{
Matchers: []*silencepb.Matcher{m},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
}
require.NoError(t, silences.Set(expiredSil))
require.NoError(t, silences.Expire(expiredSil.Id))
require.NoError(t, silences.Set(t.Context(), expiredSil))
require.NoError(t, silences.Expire(t.Context(), expiredSil.Id))

for i, tc := range []struct {
sid string
Expand Down Expand Up @@ -222,16 +222,16 @@ func TestPostSilencesHandler(t *testing.T) {
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
}
require.NoError(t, silences.Set(unexpiredSil))
require.NoError(t, silences.Set(t.Context(), unexpiredSil))

expiredSil := &silencepb.Silence{
Matchers: []*silencepb.Matcher{m},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
}
require.NoError(t, silences.Set(expiredSil))
require.NoError(t, silences.Expire(expiredSil.Id))
require.NoError(t, silences.Set(t.Context(), expiredSil))
require.NoError(t, silences.Expire(t.Context(), expiredSil.Id))

t.Run("Silences CRUD", func(t *testing.T) {
for i, tc := range []struct {
Expand Down
6 changes: 5 additions & 1 deletion api/v2/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package v2

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -170,7 +171,10 @@ func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers
}

// OpenAPIAlertsToAlerts converts open_api_models.PostableAlerts to []*types.Alert.
func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts) []*types.Alert {
func OpenAPIAlertsToAlerts(ctx context.Context, apiAlerts open_api_models.PostableAlerts) []*types.Alert {
_, span := tracer.Start(ctx, "OpenAPIAlertsToAlerts")
defer span.End()

alerts := []*types.Alert{}
for _, apiAlert := range apiAlerts {
alert := types.Alert{
Expand Down
26 changes: 22 additions & 4 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/timeinterval"
"github.com/prometheus/alertmanager/tracing"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/alertmanager/ui"
)
Expand Down Expand Up @@ -404,6 +405,8 @@ func run() int {
return d + waitFunc()
}

tracingManager := tracing.NewManager(logger.With("component", "tracing"))

var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
Expand Down Expand Up @@ -491,9 +494,9 @@ func run() int {
configuredIntegrations.Set(float64(integrationsNum))
configuredInhibitionRules.Set(float64(len(conf.InhibitRules)))

api.Update(conf, func(labels model.LabelSet) {
inhibitor.Mutes(labels)
silencer.Mutes(labels)
api.Update(conf, func(ctx context.Context, labels model.LabelSet) {
inhibitor.Mutes(ctx, labels)
silencer.Mutes(ctx, labels)
})

newDisp := dispatch.NewDispatcher(
Expand Down Expand Up @@ -546,6 +549,13 @@ func run() int {
newDisp.WaitForLoading()
disp = newDisp

err = tracingManager.ApplyConfig(conf)
if err != nil {
return fmt.Errorf("failed to apply tracing config: %w", err)
}

go tracingManager.Run()

return nil
})

Expand Down Expand Up @@ -574,7 +584,10 @@ func run() int {

mux := api.Register(router, *routePrefix)

srv := &http.Server{Handler: mux}
srv := &http.Server{
// instrument all handlers with tracing
Handler: tracing.Middleware(mux),
}
srvc := make(chan struct{})

go func() {
Expand Down Expand Up @@ -605,6 +618,11 @@ func run() int {
errc <- configCoordinator.Reload()
case <-term:
logger.Info("Received SIGTERM, exiting gracefully...")

// shut down the tracing manager to flush any remaining spans.
// this blocks for up to 5s
tracingManager.Stop()

return 0
case <-srvc:
return 1
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 Prometheus Team
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -351,6 +351,8 @@ type Config struct {
MuteTimeIntervals []MuteTimeInterval `yaml:"mute_time_intervals,omitempty" json:"mute_time_intervals,omitempty"`
TimeIntervals []TimeInterval `yaml:"time_intervals,omitempty" json:"time_intervals,omitempty"`

TracingConfig TracingConfig `yaml:"tracing,omitempty" json:"tracing,omitempty"`

// original is the input from which the config was parsed.
original string
}
Expand Down
Loading
Loading