Skip to content

Commit 22bfc0c

Browse files
hairyhendersonsiavashs
authored andcommitted
feat: add distributed tracing support
Add tracing support using otel: - api: extract trace and span IDs from request context and add to alerts - types: add trace and span IDs to alerts - dispatch: add distributed tracing support - notify: add distributed tracing support This change borrows part of the implementation from #3673 Fixes #3670 Signed-off-by: Dave Henderson <dhenderson@gmail.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com>
1 parent 616f034 commit 22bfc0c

File tree

14 files changed

+747
-21
lines changed

14 files changed

+747
-21
lines changed

api/v2/api.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/prometheus/alertmanager/silence"
5151
"github.com/prometheus/alertmanager/silence/silencepb"
5252
"github.com/prometheus/alertmanager/types"
53+
"go.opentelemetry.io/otel/trace"
5354
)
5455

5556
// API represents an Alertmanager API v2.
@@ -313,7 +314,15 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
313314
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
314315
logger := api.requestLogger(params.HTTPRequest)
315316

316-
alerts := OpenAPIAlertsToAlerts(params.Alerts)
317+
// Extract trace ID and span ID from the request context
318+
traceID := ""
319+
spanID := ""
320+
if spanCtx := trace.SpanContextFromContext(params.HTTPRequest.Context()); spanCtx.IsValid() {
321+
traceID = spanCtx.TraceID().String()
322+
spanID = spanCtx.SpanID().String()
323+
}
324+
325+
alerts := OpenAPIAlertsToAlerts(params.Alerts, traceID, spanID)
317326
now := time.Now()
318327

319328
api.mtx.RLock()

api/v2/compat.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers
170170
}
171171

172172
// OpenAPIAlertsToAlerts converts open_api_models.PostableAlerts to []*types.Alert.
173-
func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts) []*types.Alert {
173+
func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts, traceID, spanID string) []*types.Alert {
174174
alerts := []*types.Alert{}
175175
for _, apiAlert := range apiAlerts {
176176
alert := types.Alert{
@@ -181,6 +181,8 @@ func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts) []*types.Al
181181
EndsAt: time.Time(apiAlert.EndsAt),
182182
GeneratorURL: string(apiAlert.GeneratorURL),
183183
},
184+
TraceID: traceID,
185+
SpanID: spanID,
184186
}
185187
alerts = append(alerts, &alert)
186188
}

cmd/alertmanager/main.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"github.com/prometheus/alertmanager/silence"
6060
"github.com/prometheus/alertmanager/template"
6161
"github.com/prometheus/alertmanager/timeinterval"
62+
"github.com/prometheus/alertmanager/tracing"
6263
"github.com/prometheus/alertmanager/types"
6364
"github.com/prometheus/alertmanager/ui"
6465
)
@@ -401,6 +402,8 @@ func run() int {
401402
return d + waitFunc()
402403
}
403404

405+
tracingManager := tracing.NewManager(logger.With("component", "tracing"))
406+
404407
var (
405408
inhibitor *inhibit.Inhibitor
406409
tmpl *template.Template
@@ -523,6 +526,13 @@ func run() int {
523526
go disp.Run()
524527
go inhibitor.Run()
525528

529+
err = tracingManager.ApplyConfig(conf)
530+
if err != nil {
531+
return fmt.Errorf("failed to apply tracing config: %w", err)
532+
}
533+
534+
go tracingManager.Run()
535+
526536
return nil
527537
})
528538

@@ -551,7 +561,10 @@ func run() int {
551561

552562
mux := api.Register(router, *routePrefix)
553563

554-
srv := &http.Server{Handler: mux}
564+
srv := &http.Server{
565+
// instrument all handlers with tracing
566+
Handler: tracing.Middleware(mux),
567+
}
555568
srvc := make(chan struct{})
556569

557570
go func() {
@@ -582,6 +595,11 @@ func run() int {
582595
errc <- configCoordinator.Reload()
583596
case <-term:
584597
logger.Info("Received SIGTERM, exiting gracefully...")
598+
599+
// shut down the tracing manager to flush any remaining spans.
600+
// this blocks for up to 5s
601+
tracingManager.Stop()
602+
585603
return 0
586604
case <-srvc:
587605
return 1

config/config.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ type Config struct {
351351
MuteTimeIntervals []MuteTimeInterval `yaml:"mute_time_intervals,omitempty" json:"mute_time_intervals,omitempty"`
352352
TimeIntervals []TimeInterval `yaml:"time_intervals,omitempty" json:"time_intervals,omitempty"`
353353

354+
TracingConfig TracingConfig `yaml:"tracing,omitempty" json:"tracing,omitempty"`
355+
354356
// original is the input from which the config was parsed.
355357
original string
356358
}
@@ -1182,3 +1184,111 @@ func (m Matchers) MarshalJSON() ([]byte, error) {
11821184
}
11831185
return json.Marshal(result)
11841186
}
1187+
1188+
// TODO: probably move these into prometheus/common since they're copied from
1189+
// prometheus/prometheus?
1190+
1191+
type TracingClientType string
1192+
1193+
const (
1194+
TracingClientHTTP TracingClientType = "http"
1195+
TracingClientGRPC TracingClientType = "grpc"
1196+
1197+
GzipCompression = "gzip"
1198+
)
1199+
1200+
// UnmarshalYAML implements the yaml.Unmarshaler interface.
1201+
func (t *TracingClientType) UnmarshalYAML(unmarshal func(interface{}) error) error {
1202+
*t = TracingClientType("")
1203+
type plain TracingClientType
1204+
if err := unmarshal((*plain)(t)); err != nil {
1205+
return err
1206+
}
1207+
1208+
if *t != TracingClientHTTP && *t != TracingClientGRPC {
1209+
return fmt.Errorf("expected tracing client type to be to be %s or %s, but got %s",
1210+
TracingClientHTTP, TracingClientGRPC, *t,
1211+
)
1212+
}
1213+
1214+
return nil
1215+
}
1216+
1217+
// TracingConfig configures the tracing options.
1218+
type TracingConfig struct {
1219+
ClientType TracingClientType `yaml:"client_type,omitempty"`
1220+
Endpoint string `yaml:"endpoint,omitempty"`
1221+
SamplingFraction float64 `yaml:"sampling_fraction,omitempty"`
1222+
Insecure bool `yaml:"insecure,omitempty"`
1223+
TLSConfig commoncfg.TLSConfig `yaml:"tls_config,omitempty"`
1224+
Headers map[string]string `yaml:"headers,omitempty"`
1225+
Compression string `yaml:"compression,omitempty"`
1226+
Timeout model.Duration `yaml:"timeout,omitempty"`
1227+
}
1228+
1229+
// SetDirectory joins any relative file paths with dir.
1230+
func (t *TracingConfig) SetDirectory(dir string) {
1231+
t.TLSConfig.SetDirectory(dir)
1232+
}
1233+
1234+
// UnmarshalYAML implements the yaml.Unmarshaler interface.
1235+
func (t *TracingConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
1236+
*t = TracingConfig{
1237+
ClientType: TracingClientGRPC,
1238+
}
1239+
type plain TracingConfig
1240+
if err := unmarshal((*plain)(t)); err != nil {
1241+
return err
1242+
}
1243+
1244+
if err := validateHeadersForTracing(t.Headers); err != nil {
1245+
return err
1246+
}
1247+
1248+
if t.Endpoint == "" {
1249+
return errors.New("tracing endpoint must be set")
1250+
}
1251+
1252+
if t.Compression != "" && t.Compression != GzipCompression {
1253+
return fmt.Errorf("invalid compression type %s provided, valid options: %s",
1254+
t.Compression, GzipCompression)
1255+
}
1256+
1257+
return nil
1258+
}
1259+
1260+
var reservedHeaders = map[string]struct{}{
1261+
// NOTE: authorization is checked specially,
1262+
// see RemoteWriteConfig.UnmarshalYAML.
1263+
// "authorization": {},
1264+
"host": {},
1265+
"content-encoding": {},
1266+
"content-length": {},
1267+
"content-type": {},
1268+
"user-agent": {},
1269+
"connection": {},
1270+
"keep-alive": {},
1271+
"proxy-authenticate": {},
1272+
"proxy-authorization": {},
1273+
"www-authenticate": {},
1274+
"accept-encoding": {},
1275+
"x-prometheus-remote-write-version": {},
1276+
"x-prometheus-remote-read-version": {},
1277+
1278+
// Added by SigV4.
1279+
"x-amz-date": {},
1280+
"x-amz-security-token": {},
1281+
"x-amz-content-sha256": {},
1282+
}
1283+
1284+
func validateHeadersForTracing(headers map[string]string) error {
1285+
for header := range headers {
1286+
if strings.ToLower(header) == "authorization" {
1287+
return errors.New("custom authorization header configuration is not yet supported")
1288+
}
1289+
if _, ok := reservedHeaders[strings.ToLower(header)]; ok {
1290+
return fmt.Errorf("%s is a reserved header. It must not be changed", header)
1291+
}
1292+
}
1293+
return nil
1294+
}

dispatch/dispatch.go

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@ import (
2525
"github.com/prometheus/client_golang/prometheus"
2626
"github.com/prometheus/client_golang/prometheus/promauto"
2727
"github.com/prometheus/common/model"
28+
"go.opentelemetry.io/otel"
29+
"go.opentelemetry.io/otel/attribute"
30+
"go.opentelemetry.io/otel/codes"
31+
"go.opentelemetry.io/otel/trace"
2832

2933
"github.com/prometheus/alertmanager/notify"
3034
"github.com/prometheus/alertmanager/provider"
3135
"github.com/prometheus/alertmanager/store"
3236
"github.com/prometheus/alertmanager/types"
3337
)
3438

39+
var tracer = otel.Tracer("github.com/prometheus/alertmanager/dispatch")
40+
3541
// DispatcherMetrics represents metrics associated to a dispatcher.
3642
type DispatcherMetrics struct {
3743
aggrGroups prometheus.Gauge
@@ -162,20 +168,12 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
162168
return
163169
}
164170

165-
d.logger.Debug("Received alert", "alert", alert)
166-
167171
// Log errors but keep trying.
168172
if err := it.Err(); err != nil {
169173
d.logger.Error("Error on alert update", "err", err)
170174
continue
171175
}
172-
173-
now := time.Now()
174-
for _, r := range d.route.Match(alert.Labels) {
175-
d.processAlert(alert, r)
176-
}
177-
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
178-
176+
d.dispatch(alert)
179177
case <-maintenance.C:
180178
d.doMaintenance()
181179
case <-d.ctx.Done():
@@ -184,6 +182,41 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
184182
}
185183
}
186184

185+
func (d *Dispatcher) dispatch(alert *types.Alert) {
186+
d.logger.Debug("Received alert", "alert", alert)
187+
188+
attrs := []attribute.KeyValue{
189+
attribute.String("alert.name", alert.Name()),
190+
attribute.String("alert.fingerprint", alert.Fingerprint().String()),
191+
}
192+
193+
// Build span start options
194+
spanOpts := []trace.SpanStartOption{
195+
trace.WithAttributes(attrs...),
196+
// we'll use producer here since the alert is not processed
197+
// synchronously
198+
trace.WithSpanKind(trace.SpanKindProducer),
199+
}
200+
201+
// Check if the alert has a valid trace context and link to it
202+
alertSpanCtx := alert.TraceSpanContext()
203+
if alertSpanCtx.IsValid() {
204+
spanOpts = append(spanOpts, trace.WithLinks(trace.Link{
205+
SpanContext: alertSpanCtx,
206+
}))
207+
}
208+
209+
traceCtx, span := tracer.Start(d.ctx, "dispatch.Dispatcher.dispatch", spanOpts...)
210+
defer span.End()
211+
212+
now := time.Now()
213+
for _, r := range d.route.Match(alert.Labels) {
214+
d.processAlert(trace.LinkFromContext(traceCtx), alert, r)
215+
span.SetAttributes(attribute.String("receiver", r.RouteOpts.Receiver))
216+
}
217+
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
218+
}
219+
187220
func (d *Dispatcher) doMaintenance() {
188221
d.mtx.Lock()
189222
defer d.mtx.Unlock()
@@ -310,7 +343,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool
310343

311344
// processAlert determines in which aggregation group the alert falls
312345
// and inserts it.
313-
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
346+
func (d *Dispatcher) processAlert(dispatchLink trace.Link, alert *types.Alert, route *Route) {
314347
groupLabels := getGroupLabels(alert, route)
315348

316349
fp := groupLabels.Fingerprint()
@@ -348,15 +381,26 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
348381
ag.insert(alert)
349382

350383
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
384+
ctx, span := tracer.Start(ctx, "dispatch.Dispatch.notify",
385+
trace.WithAttributes(attribute.Int("alerts.count", len(alerts))),
386+
trace.WithLinks(dispatchLink),
387+
trace.WithSpanKind(trace.SpanKindConsumer),
388+
)
389+
defer span.End()
390+
351391
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
352392
if err != nil {
353393
logger := d.logger.With("aggrGroup", ag.GroupKey(), "num_alerts", len(alerts), "err", err)
354394
if errors.Is(ctx.Err(), context.Canceled) {
355395
// It is expected for the context to be canceled on
356396
// configuration reload or shutdown. In this case, the
357397
// message should only be logged at the debug level.
398+
span.RecordError(fmt.Errorf("notify for alerts failed: %w", err))
399+
span.SetStatus(codes.Error, err.Error())
358400
logger.Debug("Notify for alerts failed")
359401
} else {
402+
span.RecordError(fmt.Errorf("notify for alerts failed: %w", err))
403+
span.SetStatus(codes.Error, err.Error())
360404
logger.Error("Notify for alerts failed")
361405
}
362406
}

examples/webhook/echo.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@ import (
2222
)
2323

2424
func main() {
25-
log.Fatal(http.ListenAndServe(":5001", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
25+
log.Fatal(http.ListenAndServe("127.0.0.1:5001", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
26+
log.Printf("headers:")
27+
for k, v := range r.Header {
28+
log.Printf(" %s: %s", k, v)
29+
}
30+
2631
b, err := io.ReadAll(r.Body)
2732
if err != nil {
2833
panic(err)

0 commit comments

Comments
 (0)