Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/eventrecorder"
"github.com/prometheus/alertmanager/matcher/compat"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/provider"
Expand Down Expand Up @@ -328,7 +329,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postAlertsHandler")
ctx, span := tracer.Start(eventrecorder.WithEventRecording(params.HTTPRequest.Context()), "api.postAlertsHandler")
defer span.End()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

alerts := OpenAPIAlertsToAlerts(ctx, params.Alerts)
Expand Down Expand Up @@ -678,7 +679,7 @@ func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middlewar
func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.deleteSilenceHandler")
ctx, span := tracer.Start(eventrecorder.WithEventRecording(params.HTTPRequest.Context()), "api.deleteSilenceHandler")
defer span.End()

sid := params.SilenceID.String()
Expand All @@ -695,7 +696,7 @@ func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) mid
func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)

ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postSilencesHandler")
ctx, span := tracer.Start(eventrecorder.WithEventRecording(params.HTTPRequest.Context()), "api.postSilencesHandler")
defer span.End()

sil, err := PostableSilenceToProto(params.Silence)
Expand Down
2 changes: 2 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ modules:
name: prometheus/alertmanager/cluster
- path: silence/silencepb
name: prometheus/alertmanager/silence
- path: eventrecorder/eventrecorderpb
name: prometheus/alertmanager/eventrecorder
58 changes: 51 additions & 7 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/config/receiver"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/eventrecorder"
"github.com/prometheus/alertmanager/eventrecorder/eventrecorderpb"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/httpserver"
"github.com/prometheus/alertmanager/inhibit"
Expand Down Expand Up @@ -266,6 +268,39 @@ func run() int {
stopc := make(chan struct{})
var wg sync.WaitGroup

// Load config once for both event recorder initialization and the first
// coordinator apply. Subsequent reloads (SIGHUP, /-/reload) go
// through configCoordinator.Reload() which reads the file again.
initialConf, err := config.LoadFile(*configFile)
if err != nil {
logger.Error("error loading configuration file", "err", err)
return 1
}

hostname, _ := os.Hostname()
var eventRec eventrecorder.Recorder
if ff.EnableEventRecorder() {
eventRec = eventrecorder.NewRecorderFromConfig(initialConf.EventRecorder, hostname, logger.With("component", "eventrecorder"), prometheus.DefaultRegisterer)
}
defer eventRec.Close()

recordCtx := eventrecorder.WithEventRecording(context.Background())
eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{
EventType: &eventrecorderpb.EventData_AlertmanagerStartupEvent{
AlertmanagerStartupEvent: &eventrecorderpb.AlertmanagerStartupEvent{
Version: version.Version,
BuildContext: version.BuildContext(),
},
},
})
defer func() {
eventRec.RecordEvent(recordCtx, &eventrecorderpb.EventData{
EventType: &eventrecorderpb.EventData_AlertmanagerShutdownEvent{
AlertmanagerShutdownEvent: &eventrecorderpb.AlertmanagerShutdownEvent{},
},
})
}()

notificationLogOpts := nflog.Options{
SnapshotFile: filepath.Join(*dataDir, "nflog"),
Retention: *retention,
Expand Down Expand Up @@ -296,9 +331,10 @@ func run() int {
MaxSilences: func() int { return *maxSilences },
MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes },
},
Logger: logger.With("component", "silences"),
Metrics: prometheus.DefaultRegisterer,
Logging: *silenceLogging,
Logger: logger.With("component", "silences"),
Metrics: prometheus.DefaultRegisterer,
Logging: *silenceLogging,
EventRecorder: eventRec,
}

silences, err := silence.New(silenceOpts)
Expand All @@ -321,7 +357,7 @@ func run() int {
wg.Wait()
}()

silencer := silence.NewSilencer(silences, marker, logger)
silencer := silence.NewSilencer(silences, marker, logger, eventRec)

// Peer state listeners have been registered, now we can join and get the initial state.
if peer != nil {
Expand All @@ -340,6 +376,7 @@ func run() int {
}
}()
go peer.Settle(ctx, *gossipInterval*10)
eventRec.SetClusterPeer(peer)
}

alerts, err := mem.NewAlerts(
Expand All @@ -349,6 +386,7 @@ func run() int {
*perAlertNameLimit,
silencer,
logger,
eventRec,
prometheus.DefaultRegisterer,
ff,
)
Expand Down Expand Up @@ -419,14 +457,19 @@ func run() int {
)

dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff, eventRec)
configLogger := logger.With("component", "configuration")
configCoordinator := config.NewCoordinator(
*configFile,
prometheus.DefaultRegisterer,
configLogger,
)
configCoordinator.Subscribe(func(conf *config.Config) error {
// Reload event recorder outputs first so events emitted during
// the rest of this callback (e.g., by stopping the old
// dispatcher) go to the new outputs.
eventRec.ApplyConfig(conf.EventRecorder)

tmpl, err = template.FromGlobs(conf.Templates)
if err != nil {
return fmt.Errorf("failed to parse templates: %w", err)
Expand Down Expand Up @@ -473,7 +516,7 @@ func run() int {
inhibitor.Load().Stop()
disp.Load().Stop()

newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
newInhibitor := inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger, eventRec)
inhibitor.Store(newInhibitor)

// An interface value that holds a nil concrete value is non-nil.
Expand Down Expand Up @@ -513,6 +556,7 @@ func run() int {
*dispatchMaintenanceInterval,
nil,
logger,
eventRec,
dispMetrics,
)
routes.Walk(func(r *dispatch.Route) {
Expand Down Expand Up @@ -564,7 +608,7 @@ func run() int {
return nil
})

if err := configCoordinator.Reload(); err != nil {
if err := configCoordinator.ApplyConfig(initialConf); err != nil {
return 1
}

Expand Down
57 changes: 57 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ func resolveFilepaths(baseDir string, cfg *Config) {
cfg.HTTPConfig.SetDirectory(baseDir)
}
}

for i, out := range cfg.EventRecorder.Outputs {
if out.Type == "file" {
cfg.EventRecorder.Outputs[i].Path = join(out.Path)
}
if out.HTTPConfig != nil {
out.HTTPConfig.SetDirectory(baseDir)
}
}
}

// MuteTimeInterval represents a named set of time intervals for which a route should be muted.
Expand Down Expand Up @@ -268,10 +277,58 @@ type Config struct {

TracingConfig tracing.TracingConfig `yaml:"tracing,omitempty" json:"tracing,omitempty"`

EventRecorder EventRecorderConfig `yaml:"event_recorder,omitempty" json:"event_recorder,omitempty"`

// original is the input from which the config was parsed.
original string
}

// EventRecorderConfig configures the event recorder feature.
type EventRecorderConfig struct {
Outputs []EventRecorderOutput `yaml:"outputs,omitempty" json:"outputs,omitempty"`
}

// EventRecorderOutput configures a single event recorder output destination.
type EventRecorderOutput struct {
Type string `yaml:"type" json:"type"`
Path string `yaml:"path,omitempty" json:"path,omitempty"`
URL *amcommoncfg.SecretURL `yaml:"url,omitempty" json:"url,omitempty"`
HTTPConfig *commoncfg.HTTPClientConfig `yaml:"http_config,omitempty" json:"http_config,omitempty"`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
// Timeout for webhook HTTP requests (default 10s).
Timeout model.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
// Workers is the number of concurrent webhook delivery goroutines
// (default 4). Only applicable to webhook outputs.
Workers int `yaml:"workers,omitempty" json:"workers,omitempty"`
// MaxRetries is the maximum number of delivery attempts per event
// (default 3). Only applicable to webhook outputs.
MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries,omitempty"`
// RetryBackoff is the base backoff duration between retry attempts
// (default 500ms). Successive attempts use exponential backoff
// (base * 2^attempt). Only applicable to webhook outputs.
RetryBackoff model.Duration `yaml:"retry_backoff,omitempty" json:"retry_backoff,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface for EventRecorderOutput.
func (o *EventRecorderOutput) UnmarshalYAML(unmarshal func(any) error) error {
type plain EventRecorderOutput
if err := unmarshal((*plain)(o)); err != nil {
return err
}
switch o.Type {
case "file":
if o.Path == "" {
return errors.New("event_recorder file output requires a path")
}
case "webhook":
if o.URL == nil {
return errors.New("event_recorder webhook output requires a url")
}
default:
return fmt.Errorf("unknown event_recorder output type %q, must be \"file\" or \"webhook\"", o.Type)
}
return nil
}

func (c Config) String() string {
b, err := yaml.Marshal(c)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions config/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"crypto/md5"
"encoding/binary"
"errors"
"log/slog"
"sync"

Expand Down Expand Up @@ -144,6 +145,38 @@ func (c *Coordinator) Reload() error {
return nil
}

// ApplyConfig accepts an already-loaded configuration, stores it, and
// notifies all subscribers. Use this for the initial load so the file
// is only read once.
func (c *Coordinator) ApplyConfig(conf *Config) error {
c.mutex.Lock()
defer c.mutex.Unlock()

if conf == nil {
c.configSuccessMetric.Set(0)
return errors.New("nil config passed to ApplyConfig")
}

c.config = conf

if err := c.notifySubscribers(); err != nil {
c.logger.Error(
"one or more config change subscribers failed to apply new config",
"file", c.configFilePath,
"err", err,
)
c.configSuccessMetric.Set(0)
return err
}

c.configSuccessMetric.Set(1)
c.configSuccessTimeMetric.SetToCurrentTime()
hash := md5HashAsMetricValue([]byte(c.config.original))
c.configHashMetric.Set(hash)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
return nil
}

func md5HashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
Expand Down
Loading