Skip to content

Commit

Permalink
WIP: Per-route enrichment hooks.
Browse files Browse the repository at this point in the history
  • Loading branch information
stevesg committed Dec 3, 2024
1 parent b5e64e8 commit 3bcf275
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 0 deletions.
40 changes: 40 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,43 @@ func (ti *TimeInterval) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

// Enrichment configures enrichments.
type Enrichment struct {
// HTTPConfig configures the HTTP client used for the request.
HTTPConfig *commoncfg.HTTPClientConfig `yaml:"http_config,omitempty" json:"http_config,omitempty"`

// URL to send POST request to.
URL SecretURL `yaml:"url" json:"url"`

// Timeout is the maximum length of time an enrichment can take.
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

var (

Check failure on line 312 in config/config.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
DefaultEnrichment = Enrichment{
Timeout: 15 * time.Second,
}
)

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *Enrichment) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultEnrichment
type plain Enrichment
if err := unmarshal((*plain)(c)); err != nil {

Check warning on line 322 in config/config.go

View workflow job for this annotation

GitHub Actions / lint

if-return: redundant if ...; err != nil check, just return error instead. (revive)
return err
}

return nil
}

// Config is the top-level configuration for Alertmanager's config files.
type Config struct {
Global *GlobalConfig `yaml:"global,omitempty" json:"global,omitempty"`
Route *Route `yaml:"route,omitempty" json:"route,omitempty"`
InhibitRules []InhibitRule `yaml:"inhibit_rules,omitempty" json:"inhibit_rules,omitempty"`
Receivers []Receiver `yaml:"receivers,omitempty" json:"receivers,omitempty"`
Templates []string `yaml:"templates" json:"templates"`

// Deprecated. Remove before v1.0 release.
MuteTimeIntervals []MuteTimeInterval `yaml:"mute_time_intervals,omitempty" json:"mute_time_intervals,omitempty"`
TimeIntervals []TimeInterval `yaml:"time_intervals,omitempty" json:"time_intervals,omitempty"`
Expand Down Expand Up @@ -568,6 +598,13 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}

// TODO: Propsgate Globsl.HTTPConfig to enrichments.
//for _, enr := range r.Enrichments {
// if enr.HTTPConfig == nil {
// enr.HTTPConfig = c.Global.HTTPConfig
// }
//}

tiNames := make(map[string]struct{})

// read mute time intervals until deprecated
Expand Down Expand Up @@ -794,6 +831,9 @@ type Route struct {
GroupWait *model.Duration `yaml:"group_wait,omitempty" json:"group_wait,omitempty"`
GroupInterval *model.Duration `yaml:"group_interval,omitempty" json:"group_interval,omitempty"`
RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty" json:"repeat_interval,omitempty"`

// Experimental.
Enrichments []Enrichment `yaml:"enrichments,omitempty" json:"enrichments,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface for Route.
Expand Down
1 change: 1 addition & 0 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithEnrichments(ctx, ag.opts.Enrichments)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
Expand Down
12 changes: 12 additions & 0 deletions dispatch/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/enrichment"
"github.com/prometheus/alertmanager/pkg/labels"
)

Expand Down Expand Up @@ -121,6 +122,14 @@ func NewRoute(cr *config.Route, parent *Route) *Route {
opts.MuteTimeIntervals = cr.MuteTimeIntervals
opts.ActiveTimeIntervals = cr.ActiveTimeIntervals

// Build enrichments.
enrichments, err := enrichment.NewEnrichments(cr.Enrichments)
if err != nil {
// This error must not happen because the config already validates the yaml.
panic(err)
}
opts.Enrichments = enrichments

route := &Route{
parent: parent,
RouteOpts: opts,
Expand Down Expand Up @@ -236,6 +245,9 @@ type RouteOpts struct {

// A list of time intervals for which the route is active.
ActiveTimeIntervals []string

// Enrichments to apply to alerts before sending notifications.
Enrichments *enrichment.Enrichments
}

func (ro *RouteOpts) String() string {
Expand Down
130 changes: 130 additions & 0 deletions enrichment/enrichment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2024 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package enrichment

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

commoncfg "github.com/prometheus/common/config"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)

type Enrichments struct {
enrichments []*Enrichment
}

func NewEnrichments(enrs []config.Enrichment) (*Enrichments, error) {
enrichments := make([]*Enrichment, 0, len(enrs))

for _, enr := range enrs {
enrichment, err := NewEnrichment(enr)
if err != nil {
return nil, err
}

enrichments = append(enrichments, enrichment)
}

return &Enrichments{
enrichments: enrichments,
}, nil
}

func (e *Enrichments) Apply(ctx context.Context, l log.Logger, alerts ...*types.Alert) {
var (
success = 0
failed = 0
)

// TODO: These could/should be done async. Need to decide if to allow dependent enrichments.
for i, enr := range e.enrichments {
if err := enr.Apply(ctx, l, alerts...); err != nil {
// Attempt to apply all enrichments, one doesn't need to affect the others.
level.Error(l).Log("msg", "Enrichment failed", "i", i, "err", err)
failed++
} else {
success++
}
}

level.Debug(l).Log("msg", "Enrichments applied", "success", success, "failed", failed)
}

type Enrichment struct {
conf config.Enrichment
client *http.Client
}

func NewEnrichment(conf config.Enrichment) (*Enrichment, error) {
client, err := commoncfg.NewClientFromConfig(*conf.HTTPConfig, "enrichment")
if err != nil {
return nil, err
}

return &Enrichment{
conf: conf,
client: client,
}, nil
}

func (e *Enrichment) Apply(ctx context.Context, l log.Logger, alerts ...*types.Alert) error {
// TODO: Template isn't needed by this function but we need to pass something.
data := notify.GetTemplateData(ctx, &template.Template{}, alerts, l)

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(data); err != nil {
return err
}

url := e.conf.URL.String()

if e.conf.Timeout > 0 {
postCtx, cancel := context.WithTimeoutCause(ctx, e.conf.Timeout, fmt.Errorf("configured enrichment timeout reached (%s)", e.conf.Timeout))
defer cancel()
ctx = postCtx
}

resp, err := notify.PostJSON(ctx, e.client, url, &buf)
if err != nil {
if ctx.Err() != nil {
err = fmt.Errorf("%w: %w", err, context.Cause(ctx))
}
return notify.RedactURL(err)
}
defer resp.Body.Close()

var result template.Data
err = json.NewDecoder(resp.Body).Decode(&result)

Check failure on line 119 in enrichment/enrichment.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)

// TODO: Do something with the result.
// TODO: Don't log the URL unredacted.
level.Info(l).Log("msg", "Enrichment result",
"url", url,
"groupLabels", result.GroupLabels,
"commonLabels", result.CommonLabels,
"commonAnnotations", result.CommonLabels)

return nil
}
38 changes: 38 additions & 0 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ const (
keyNow
keyMuteTimeIntervals
keyActiveTimeIntervals
keyEnrichments
)

// WithReceiverName populates a context with a receiver name.
Expand Down Expand Up @@ -178,6 +179,15 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context {
return context.WithValue(ctx, keyActiveTimeIntervals, at)
}

type Enricher interface {
Apply(ctx context.Context, l log.Logger, alerts ...*types.Alert)
}

// WithEnrichments populates a context with enrichments to apply when notifying.
func WithEnrichments(ctx context.Context, e Enricher) context.Context {
return context.WithValue(ctx, keyEnrichments, e)
}

// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
Expand Down Expand Up @@ -241,6 +251,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) {
return v, ok
}

// Enrichments extracts the enricher to apply enrichments with.
// Iff none exists, the second argument is false.
func Enrichments(ctx context.Context) (Enricher, bool) {
v, ok := ctx.Value(keyEnrichments).(Enricher)
return v, ok
}

// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
Expand Down Expand Up @@ -435,6 +452,7 @@ func createReceiverStage(
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(integrations[i], notificationLog, recv))
s = append(s, NewEnrichmentStage())
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))

Expand Down Expand Up @@ -1015,3 +1033,23 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty

return ctx, alerts, nil
}

// EnrichmentStage mutates the alerts by means of external web hooks.
type EnrichmentStage struct{}

func NewEnrichmentStage() *EnrichmentStage {
return &EnrichmentStage{}
}

// Exec implements the Stage interface.
func (es EnrichmentStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
enricher, ok := Enrichments(ctx)
if !ok {
return ctx, alerts, nil
}

enricher.Apply(ctx, l, alerts...)

// Enrichment errors do not cause alerts to not to be sent.
return ctx, alerts, nil
}

0 comments on commit 3bcf275

Please sign in to comment.