Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ func run() int {
}

var (
configFile = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
configFile = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("15m").Duration()

webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
Expand Down Expand Up @@ -492,7 +493,7 @@ func run() int {
silencer.Mutes(labels)
})

disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
configLogger.Warn(
Expand Down
196 changes: 143 additions & 53 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

Expand Down Expand Up @@ -83,13 +85,14 @@ type Dispatcher struct {

timeout func(time.Duration) time.Duration

mtx sync.RWMutex
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
aggrGroupsNum int
mtx sync.Mutex
aggrGroupsPerRoute *sync.Map // map[*Route]*sync.Map where inner map is map[model.Fingerprint]*aggrGroup
aggrGroupsNum atomic.Int64

done chan struct{}
ctx context.Context
cancel func()
maintenanceInterval time.Duration
done chan struct{}
ctx context.Context
cancel func()

logger *slog.Logger
}
Expand All @@ -99,7 +102,7 @@ type Limits interface {
// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
// 0 or negative value = unlimited.
// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
MaxNumberOfAggregationGroups() int
MaxNumberOfAggregationGroups() int64
}

// NewDispatcher returns a new Dispatcher.
Expand All @@ -109,6 +112,7 @@ func NewDispatcher(
s notify.Stage,
mk types.GroupMarker,
to func(time.Duration) time.Duration,
mi time.Duration,
lim Limits,
l *slog.Logger,
m *DispatcherMetrics,
Expand All @@ -118,14 +122,15 @@ func NewDispatcher(
}

disp := &Dispatcher{
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
logger: l.With("component", "dispatcher"),
metrics: m,
limits: lim,
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
maintenanceInterval: mi,
logger: l.With("component", "dispatcher"),
metrics: m,
limits: lim,
}
return disp
}
Expand All @@ -135,8 +140,8 @@ func (d *Dispatcher) Run() {
d.done = make(chan struct{})

d.mtx.Lock()
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.aggrGroupsPerRoute = &sync.Map{}
d.aggrGroupsNum = atomic.Int64{}
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
Expand All @@ -146,7 +151,7 @@ func (d *Dispatcher) Run() {
}

func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(30 * time.Second)
maintenance := time.NewTicker(d.maintenanceInterval)
defer maintenance.Stop()

defer it.Close()
Expand Down Expand Up @@ -185,18 +190,43 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
}

func (d *Dispatcher) doMaintenance() {
d.mtx.Lock()
defer d.mtx.Unlock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
var groupsToDelete []struct {
route *Route
routeGroups *sync.Map
fingerprint model.Fingerprint
ag *aggrGroup
}

// Collect empty groups without holding the mutex
d.aggrGroupsPerRoute.Range(func(routeKey, routeGroupsVal interface{}) bool {
route := routeKey.(*Route)
routeGroups := routeGroupsVal.(*sync.Map)

routeGroups.Range(func(fpKey, agVal interface{}) bool {
fingerprint := fpKey.(model.Fingerprint)
ag := agVal.(*aggrGroup)

if ag.empty() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
groupsToDelete = append(groupsToDelete, struct {
route *Route
routeGroups *sync.Map
fingerprint model.Fingerprint
ag *aggrGroup
}{route, routeGroups, fingerprint, ag})
}
return true
})
return true
})

if len(groupsToDelete) > 0 {
for _, item := range groupsToDelete {
item.ag.stop()
d.marker.DeleteByGroupKey(item.ag.routeID, item.ag.GroupKey())
item.routeGroups.Delete(item.fingerprint)
d.aggrGroupsNum.Add(-1)
}
d.metrics.aggrGroups.Set(float64(d.aggrGroupsNum.Load()))
}
}

Expand Down Expand Up @@ -224,21 +254,21 @@ func (ag AlertGroups) Len() int { return len(ag) }
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
groups := AlertGroups{}

d.mtx.RLock()
defer d.mtx.RUnlock()

// Keep a list of receivers for an alert to prevent checking each alert
// again against all routes. The alert has already matched against this
// route on ingestion.
receivers := map[model.Fingerprint][]string{}

now := time.Now()
for route, ags := range d.aggrGroupsPerRoute {
d.aggrGroupsPerRoute.Range(func(routeKey, routeGroupsVal interface{}) bool {
route := routeKey.(*Route)
if !routeFilter(route) {
continue
return true
}

for _, ag := range ags {
routeGroups := routeGroupsVal.(*sync.Map)
routeGroups.Range(func(fpKey, agVal interface{}) bool {
ag := agVal.(*aggrGroup)
receiver := route.RouteOpts.Receiver
alertGroup := &AlertGroup{
Labels: ag.labels,
Expand Down Expand Up @@ -268,13 +298,16 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
filteredAlerts = append(filteredAlerts, a)
}
if len(filteredAlerts) == 0 {
continue
return true
}
alertGroup.Alerts = filteredAlerts

groups = append(groups, alertGroup)
}
}
return true
})
return true
})

sort.Sort(groups)
for i := range groups {
sort.Sort(groups[i].Alerts)
Expand Down Expand Up @@ -312,41 +345,98 @@ type notifyFunc func(context.Context, ...*types.Alert) bool
// and inserts it.
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := getGroupLabels(alert, route)

fp := groupLabels.Fingerprint()

d.mtx.Lock()
defer d.mtx.Unlock()
// Fast path: try to load existing route groups
if routeGroupsVal, ok := d.aggrGroupsPerRoute.Load(route); ok {
routeGroups := routeGroupsVal.(*sync.Map)
if agVal, ok := routeGroups.Load(fp); ok {
// Alert goes to existing group
ag := agVal.(*aggrGroup)
ag.insert(alert)
return
}

routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}
// Group doesn't exist, but route exists - check limits and create group
limit := d.limits.MaxNumberOfAggregationGroups()
currentGroups := d.aggrGroupsNum.Load()
if limit > 0 && currentGroups >= limit {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", currentGroups, "limit", limit, "alert", alert.Name())
return
}

// Create new group and try to store it atomically
ag := newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
if agVal, loaded := routeGroups.LoadOrStore(fp, ag); loaded {
// Another goroutine created the group concurrently, use that one
ag.stop() // Stop the group we created but didn't use
existingAg := agVal.(*aggrGroup)
existingAg.insert(alert)
return
}

// We successfully created the group - update counters
d.aggrGroupsNum.Add(1)
d.metrics.aggrGroups.Inc()

ag, ok := routeGroups[fp]
if ok {
// Insert the alert and start the group
ag.insert(alert)
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("num_alerts", len(alerts), "err", err)
if errors.Is(ctx.Err(), context.Canceled) {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
logger.Debug("Notify for alerts failed")
} else {
logger.Error("Notify for alerts failed")
}
}
return err == nil
})
return
}

// Slow path: route doesn't exist, need to create both route entry and group
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
currentGroups := d.aggrGroupsNum.Load()
limit := d.limits.MaxNumberOfAggregationGroups()
if limit > 0 && currentGroups >= int64(limit) {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", currentGroups, "limit", limit, "alert", alert.Name())
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
// Create new route groups map and aggregation group
routeGroups := &sync.Map{}
ag := newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups.Store(fp, ag)

// Try to store the route groups atomically
if existingVal, loaded := d.aggrGroupsPerRoute.LoadOrStore(route, routeGroups); loaded {
// Another goroutine created the route concurrently
existingRouteGroups := existingVal.(*sync.Map)
if existingAgVal, ok := existingRouteGroups.LoadOrStore(fp, ag); ok {
// Group also exists, use that one
ag.stop() // Stop the group we created but didn't use
existingAg := existingAgVal.(*aggrGroup)
existingAg.insert(alert)
return
}
// Group didn't exist, our ag was stored successfully
}

// Update counters for the new group
d.aggrGroupsNum.Add(1)
d.metrics.aggrGroups.Inc()

// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
Expand Down Expand Up @@ -542,4 +632,4 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {

type nilLimits struct{}

func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
func (n nilLimits) MaxNumberOfAggregationGroups() int64 { return 0 }
Loading
Loading