Skip to content

Commit 858e51b

Browse files
hairyhendersonsiavashs
authored andcommitted
feat: add distributed tracing support
Add tracing support using otel: - api: extract trace and span IDs from request context and add to alerts - types: add trace and span IDs to alerts - dispatch: add distributed tracing support - notify: add distributed tracing support This change borrows part of the implementation from #3673 Fixes #3670 Signed-off-by: Dave Henderson <dhenderson@gmail.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com>
1 parent 2e0970e commit 858e51b

File tree

29 files changed

+1079
-157
lines changed

29 files changed

+1079
-157
lines changed

api/api.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package api
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920
"log/slog"
@@ -27,6 +28,7 @@ import (
2728
"github.com/prometheus/common/model"
2829
"github.com/prometheus/common/promslog"
2930
"github.com/prometheus/common/route"
31+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
3032

3133
apiv2 "github.com/prometheus/alertmanager/api/v2"
3234
"github.com/prometheus/alertmanager/cluster"
@@ -203,7 +205,7 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
203205

204206
// Update config and resolve timeout of each API. APIv2 also needs
205207
// setAlertStatus to be updated.
206-
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
208+
func (api *API) Update(cfg *config.Config, setAlertStatus func(ctx context.Context, labels model.LabelSet)) {
207209
api.v2.Update(cfg, setAlertStatus)
208210
}
209211

@@ -244,7 +246,7 @@ func (api *API) instrumentHandler(prefix string, h http.Handler) http.Handler {
244246
}
245247
promhttp.InstrumentHandlerDuration(
246248
api.requestDuration.MustCurryWith(prometheus.Labels{"handler": path}),
247-
h,
249+
otelhttp.WithRouteTag(path, h),
248250
).ServeHTTP(w, r)
249251
})
250252
}

api/v2/api.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package v2
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920
"log/slog"
@@ -31,6 +32,7 @@ import (
3132
prometheus_model "github.com/prometheus/common/model"
3233
"github.com/prometheus/common/version"
3334
"github.com/rs/cors"
35+
"go.opentelemetry.io/otel"
3436

3537
"github.com/prometheus/alertmanager/api/metrics"
3638
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
@@ -52,6 +54,8 @@ import (
5254
"github.com/prometheus/alertmanager/types"
5355
)
5456

57+
var tracer = otel.Tracer("github.com/prometheus/alertmanager/api/v2")
58+
5559
// API represents an Alertmanager API v2.
5660
type API struct {
5761
peer cluster.ClusterPeer
@@ -80,7 +84,7 @@ type (
8084
groupsFn func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[prometheus_model.Fingerprint][]string)
8185
groupMutedFunc func(routeID, groupKey string) ([]string, bool)
8286
getAlertStatusFn func(prometheus_model.Fingerprint) types.AlertStatus
83-
setAlertStatusFn func(prometheus_model.LabelSet)
87+
setAlertStatusFn func(ctx context.Context, labels prometheus_model.LabelSet)
8488
)
8589

8690
// NewAPI returns a new Alertmanager API v2.
@@ -272,14 +276,15 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
272276

273277
api.mtx.RLock()
274278
for a := range alerts.Next() {
279+
alert := a.Data
275280
if err = alerts.Err(); err != nil {
276281
break
277282
}
278283
if err = ctx.Err(); err != nil {
279284
break
280285
}
281286

282-
routes := api.route.Match(a.Labels)
287+
routes := api.route.Match(alert.Labels)
283288
receivers := make([]string, 0, len(routes))
284289
for _, r := range routes {
285290
receivers = append(receivers, r.RouteOpts.Receiver)
@@ -289,13 +294,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
289294
continue
290295
}
291296

292-
if !alertFilter(a, now) {
297+
if !alertFilter(alert, now) {
293298
continue
294299
}
295300

296-
alert := AlertToOpenAPIAlert(a, api.getAlertStatus(a.Fingerprint()), receivers, nil)
301+
openAlert := AlertToOpenAPIAlert(alert, api.getAlertStatus(alert.Fingerprint()), receivers, nil)
297302

298-
res = append(res, alert)
303+
res = append(res, openAlert)
299304
}
300305
api.mtx.RUnlock()
301306

@@ -313,7 +318,12 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
313318
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
314319
logger := api.requestLogger(params.HTTPRequest)
315320

316-
alerts := OpenAPIAlertsToAlerts(params.Alerts)
321+
ctx := params.HTTPRequest.Context()
322+
ctx, span := tracer.Start(ctx, "api.postAlertsHandler")
323+
defer span.End()
324+
325+
alerts := OpenAPIAlertsToAlerts(ctx, params.Alerts)
326+
317327
now := time.Now()
318328

319329
api.mtx.RLock()
@@ -359,13 +369,15 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
359369
}
360370
validAlerts = append(validAlerts, a)
361371
}
362-
if err := api.alerts.Put(validAlerts...); err != nil {
372+
if err := api.alerts.Put(ctx, validAlerts...); err != nil {
363373
logger.Error("Failed to create alerts", "err", err)
374+
span.RecordError(err)
364375
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
365376
}
366377

367378
if validationErrs.Len() > 0 {
368379
logger.Error("Failed to validate alerts", "err", validationErrs.Error())
380+
span.RecordError(validationErrs)
369381
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
370382
}
371383

@@ -436,12 +448,15 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams
436448

437449
func (api *API) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool {
438450
return func(a *types.Alert, now time.Time) bool {
451+
ctx, span := tracer.Start(context.Background(), "alertFilter")
452+
defer span.End()
453+
439454
if !a.EndsAt.IsZero() && a.EndsAt.Before(now) {
440455
return false
441456
}
442457

443458
// Set alert's current status based on its label set.
444-
api.setAlertStatus(a.Labels)
459+
api.setAlertStatus(ctx, a.Labels)
445460

446461
// Get alert's current status after seeing if it is suppressed.
447462
status := api.getAlertStatus(a.Fingerprint())

api/v2/compat.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package v2
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"time"
1920

@@ -170,7 +171,10 @@ func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers
170171
}
171172

172173
// OpenAPIAlertsToAlerts converts open_api_models.PostableAlerts to []*types.Alert.
173-
func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts) []*types.Alert {
174+
func OpenAPIAlertsToAlerts(ctx context.Context, apiAlerts open_api_models.PostableAlerts) []*types.Alert {
175+
_, span := tracer.Start(ctx, "OpenAPIAlertsToAlerts")
176+
defer span.End()
177+
174178
alerts := []*types.Alert{}
175179
for _, apiAlert := range apiAlerts {
176180
alert := types.Alert{

cmd/alertmanager/main.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"github.com/prometheus/alertmanager/silence"
6060
"github.com/prometheus/alertmanager/template"
6161
"github.com/prometheus/alertmanager/timeinterval"
62+
"github.com/prometheus/alertmanager/tracing"
6263
"github.com/prometheus/alertmanager/types"
6364
"github.com/prometheus/alertmanager/ui"
6465
)
@@ -404,6 +405,8 @@ func run() int {
404405
return d + waitFunc()
405406
}
406407

408+
tracingManager := tracing.NewManager(logger.With("component", "tracing"))
409+
407410
var (
408411
inhibitor *inhibit.Inhibitor
409412
tmpl *template.Template
@@ -491,9 +494,9 @@ func run() int {
491494
configuredIntegrations.Set(float64(integrationsNum))
492495
configuredInhibitionRules.Set(float64(len(conf.InhibitRules)))
493496

494-
api.Update(conf, func(labels model.LabelSet) {
495-
inhibitor.Mutes(labels)
496-
silencer.Mutes(labels)
497+
api.Update(conf, func(ctx context.Context, labels model.LabelSet) {
498+
inhibitor.Mutes(ctx, labels)
499+
silencer.Mutes(ctx, labels)
497500
})
498501

499502
disp = dispatch.NewDispatcher(
@@ -536,6 +539,13 @@ func run() int {
536539
go disp.Run(startTime.Add(*DispatchStartDelay))
537540
go inhibitor.Run()
538541

542+
err = tracingManager.ApplyConfig(conf)
543+
if err != nil {
544+
return fmt.Errorf("failed to apply tracing config: %w", err)
545+
}
546+
547+
go tracingManager.Run()
548+
539549
return nil
540550
})
541551

@@ -564,7 +574,10 @@ func run() int {
564574

565575
mux := api.Register(router, *routePrefix)
566576

567-
srv := &http.Server{Handler: mux}
577+
srv := &http.Server{
578+
// instrument all handlers with tracing
579+
Handler: tracing.Middleware(mux),
580+
}
568581
srvc := make(chan struct{})
569582

570583
go func() {
@@ -595,6 +608,11 @@ func run() int {
595608
errc <- configCoordinator.Reload()
596609
case <-term:
597610
logger.Info("Received SIGTERM, exiting gracefully...")
611+
612+
// shut down the tracing manager to flush any remaining spans.
613+
// this blocks for up to 5s
614+
tracingManager.Stop()
615+
598616
return 0
599617
case <-srvc:
600618
return 1

config/config.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ type Config struct {
351351
MuteTimeIntervals []MuteTimeInterval `yaml:"mute_time_intervals,omitempty" json:"mute_time_intervals,omitempty"`
352352
TimeIntervals []TimeInterval `yaml:"time_intervals,omitempty" json:"time_intervals,omitempty"`
353353

354+
TracingConfig TracingConfig `yaml:"tracing,omitempty" json:"tracing,omitempty"`
355+
354356
// original is the input from which the config was parsed.
355357
original string
356358
}
@@ -1182,3 +1184,111 @@ func (m Matchers) MarshalJSON() ([]byte, error) {
11821184
}
11831185
return json.Marshal(result)
11841186
}
1187+
1188+
// TODO: probably move these into prometheus/common since they're copied from
1189+
// prometheus/prometheus?
1190+
1191+
type TracingClientType string
1192+
1193+
const (
1194+
TracingClientHTTP TracingClientType = "http"
1195+
TracingClientGRPC TracingClientType = "grpc"
1196+
1197+
GzipCompression = "gzip"
1198+
)
1199+
1200+
// UnmarshalYAML implements the yaml.Unmarshaler interface.
1201+
func (t *TracingClientType) UnmarshalYAML(unmarshal func(interface{}) error) error {
1202+
*t = TracingClientType("")
1203+
type plain TracingClientType
1204+
if err := unmarshal((*plain)(t)); err != nil {
1205+
return err
1206+
}
1207+
1208+
if *t != TracingClientHTTP && *t != TracingClientGRPC {
1209+
return fmt.Errorf("expected tracing client type to be to be %s or %s, but got %s",
1210+
TracingClientHTTP, TracingClientGRPC, *t,
1211+
)
1212+
}
1213+
1214+
return nil
1215+
}
1216+
1217+
// TracingConfig configures the tracing options.
1218+
type TracingConfig struct {
1219+
ClientType TracingClientType `yaml:"client_type,omitempty"`
1220+
Endpoint string `yaml:"endpoint,omitempty"`
1221+
SamplingFraction float64 `yaml:"sampling_fraction,omitempty"`
1222+
Insecure bool `yaml:"insecure,omitempty"`
1223+
TLSConfig commoncfg.TLSConfig `yaml:"tls_config,omitempty"`
1224+
Headers map[string]string `yaml:"headers,omitempty"`
1225+
Compression string `yaml:"compression,omitempty"`
1226+
Timeout model.Duration `yaml:"timeout,omitempty"`
1227+
}
1228+
1229+
// SetDirectory joins any relative file paths with dir.
1230+
func (t *TracingConfig) SetDirectory(dir string) {
1231+
t.TLSConfig.SetDirectory(dir)
1232+
}
1233+
1234+
// UnmarshalYAML implements the yaml.Unmarshaler interface.
1235+
func (t *TracingConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
1236+
*t = TracingConfig{
1237+
ClientType: TracingClientGRPC,
1238+
}
1239+
type plain TracingConfig
1240+
if err := unmarshal((*plain)(t)); err != nil {
1241+
return err
1242+
}
1243+
1244+
if err := validateHeadersForTracing(t.Headers); err != nil {
1245+
return err
1246+
}
1247+
1248+
if t.Endpoint == "" {
1249+
return errors.New("tracing endpoint must be set")
1250+
}
1251+
1252+
if t.Compression != "" && t.Compression != GzipCompression {
1253+
return fmt.Errorf("invalid compression type %s provided, valid options: %s",
1254+
t.Compression, GzipCompression)
1255+
}
1256+
1257+
return nil
1258+
}
1259+
1260+
var reservedHeaders = map[string]struct{}{
1261+
// NOTE: authorization is checked specially,
1262+
// see RemoteWriteConfig.UnmarshalYAML.
1263+
// "authorization": {},
1264+
"host": {},
1265+
"content-encoding": {},
1266+
"content-length": {},
1267+
"content-type": {},
1268+
"user-agent": {},
1269+
"connection": {},
1270+
"keep-alive": {},
1271+
"proxy-authenticate": {},
1272+
"proxy-authorization": {},
1273+
"www-authenticate": {},
1274+
"accept-encoding": {},
1275+
"x-prometheus-remote-write-version": {},
1276+
"x-prometheus-remote-read-version": {},
1277+
1278+
// Added by SigV4.
1279+
"x-amz-date": {},
1280+
"x-amz-security-token": {},
1281+
"x-amz-content-sha256": {},
1282+
}
1283+
1284+
func validateHeadersForTracing(headers map[string]string) error {
1285+
for header := range headers {
1286+
if strings.ToLower(header) == "authorization" {
1287+
return errors.New("custom authorization header configuration is not yet supported")
1288+
}
1289+
if _, ok := reservedHeaders[strings.ToLower(header)]; ok {
1290+
return fmt.Errorf("%s is a reserved header. It must not be changed", header)
1291+
}
1292+
}
1293+
return nil
1294+
}

0 commit comments

Comments
 (0)