Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 103 additions & 72 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()

// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
notifyFunc := func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("num_alerts", len(alerts), "err", err)
Expand All @@ -364,7 +354,24 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
}
}
return err == nil
})
}

ag = newAggrGroup(
d.ctx,
groupLabels,
route,
notifyFunc,
d.timeout,
d.marker.(types.AlertMarker),
d.logger,
)

routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()

// Insert the 1st alert, which will start the timer.
ag.insert(alert)
}

func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {
Expand Down Expand Up @@ -392,37 +399,41 @@ type aggrGroup struct {
marker types.AlertMarker
ctx context.Context
cancel func()
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration

mtx sync.RWMutex
hasFlushed bool
notifyFunc notifyFunc
}

// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
func newAggrGroup(
ctx context.Context,
labels model.LabelSet,
r *Route,
nf notifyFunc,
to func(time.Duration) time.Duration,
marker types.AlertMarker,
logger *slog.Logger,
) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
marker: marker,
done: make(chan struct{}),
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
notifyFunc: nf,
timeout: to,
alerts: store.NewAlerts(),
marker: marker,
}
ag.ctx, ag.cancel = context.WithCancel(ctx)

ag.logger = logger.With("aggrGroup", ag)

// Set an initial one-time wait before flushing
// the first batch of notifications.
ag.next = time.NewTimer(ag.opts.GroupWait)

return ag
}

Expand All @@ -438,55 +449,56 @@ func (ag *aggrGroup) String() string {
return ag.GroupKey()
}

func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done)
defer ag.next.Stop()
func (ag *aggrGroup) onTimer() {
// Check if context is done before processing
select {
case <-ag.ctx.Done():
return
default:
}

now := time.Now()

for {
select {
case now := <-ag.next.C:
// Give the notifications time until the next flush to
// finish before terminating them.
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))

// The now time we retrieve from the ticker is the only reliable
// point of time reference for the subsequent notification pipeline.
// Calculating the current time directly is prone to flaky behavior,
// which usually only becomes apparent in tests.
ctx = notify.WithNow(ctx, now)

// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})

cancel()

case <-ag.ctx.Done():
return
}
// Give the notifications time until the next flush to
// finish before terminating them.
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
defer cancel()
// The now time is the point of time reference for the subsequent notification pipeline.
ctx = notify.WithNow(ctx, now)
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
// Flush before resetting timer to maintain backpressure.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can make this change without breaking high availability.

High availability requires that the same aggregation group in each Alertmanager in a highly available cluster ticks at not just the same interval but at the same instant. I.e. their timers must be in sync.

If we reset the timer after the flush it causes the timers to drift out of sync. The amount they drift depends on the duration of the flush which is affected by the duration of the integration (i.e. a webhook).

To show this I added a simple fmt.Println to the start of onTimer that prints the current time:

diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go
index 9e565b74..185915fe 100644
--- a/dispatch/dispatch.go
+++ b/dispatch/dispatch.go
@@ -450,6 +450,7 @@ func (ag *aggrGroup) String() string {
 }

 func (ag *aggrGroup) onTimer() {
+       fmt.Println("on timer", time.Now())
        // Check if context is done before processing
        select {
        case <-ag.ctx.Done():

And I created a webhook with a 5 second delay (of course in the real world this delay can be highly random).

With a group_wait of 15s and a group_interval of 30s, the first tick should be at 09:05:45 and the second tick at 09:15:45, but in fact the second tick occurred at 09:06:20:

on timer 2025-10-28 09:05:45.440439 +0000 GMT m=+19.361033418
time=2025-10-28T09:05:45.440Z level=DEBUG source=dispatch.go:559 msg=flushing component=dispatcher aggrGroup={}:{} alerts=[[3fe32c2][active]]
time=2025-10-28T09:05:50.445Z level=DEBUG source=notify.go:878 msg="Notify success" component=dispatcher receiver=test integration=webhook[0] aggrGroup={}:{} attempts=1 duration=5.004619917s alerts=[[3fe32c2][active]]
on timer 2025-10-28 09:06:20.44742 +0000 GMT m=+54.367559459
time=2025-10-28T09:06:20.447Z level=DEBUG source=dispatch.go:559 msg=flushing component=dispatcher aggrGroup={}:{} alerts=[[3fe32c2][active]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think we can reschedule in the top select instead to avoid slow flush affecting the next schedule.
And we should add an acceptance test to capture this behaviour.

ag.flush(func(alerts ...*types.Alert) bool {
return ag.notifyFunc(ctx, alerts...)
})

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.hasFlushed = true
// Reset the timer for the next flush, but only if context is not done
select {
case <-ag.ctx.Done():
ag.mtx.Unlock()
return
default:
ag.setTimer(ag.opts.GroupInterval)
ag.mtx.Unlock()
}
}

func (ag *aggrGroup) stop() {
// Stop the timer first to prevent any new callbacks
if ag.next != nil {
ag.next.Stop()
}
// Calling cancel will terminate all in-process notifications
// and the run() loop.
ag.cancel()
<-ag.done
}

// insert inserts the alert into the aggregation group.
Expand All @@ -495,12 +507,23 @@ func (ag *aggrGroup) insert(alert *types.Alert) {
ag.logger.Error("error on set alert", "err", err)
}

// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)

// if already flushed before, no need to schedule a new timer.
if ag.hasFlushed {
return
}

// if the alert is old enough, flush immediately.
if alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.setTimer(0)
return
}

// set the timer to group wait time, but only if timer not already set.
if ag.next == nil {
ag.setTimer(ag.opts.GroupWait)
}
}

Expand Down Expand Up @@ -553,6 +576,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
}
}

func (ag *aggrGroup) setTimer(d time.Duration) {
if ag.next != nil {
ag.next.Reset(d)
} else {
ag.next = time.AfterFunc(d, ag.onTimer)
}
}

type nilLimits struct{}

func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
27 changes: 11 additions & 16 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (

const testMaintenanceInterval = 30 * time.Second

var noopNotifyFunc = func(context.Context, ...*types.Alert) bool { return true }

func TestAggrGroup(t *testing.T) {
lset := model.LabelSet{
"a": "v1",
Expand Down Expand Up @@ -141,8 +143,7 @@ func TestAggrGroup(t *testing.T) {
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)
ag := newAggrGroup(context.Background(), lset, route, ntfy, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())

ag.insert(a1)

Expand Down Expand Up @@ -195,8 +196,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)
ag = newAggrGroup(context.Background(), lset, route, ntfy, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())

ag.insert(a1)
ag.insert(a2)
Expand Down Expand Up @@ -757,11 +757,9 @@ func TestDispatcher_DoMaintenance(t *testing.T) {

// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
aggrGroup1 := newAggrGroup(ctx, labels, route, noopNotifyFunc, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })

// Insert a marker for the aggregation group's group key.
marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"})
Expand Down Expand Up @@ -794,7 +792,7 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
ag := newAggrGroup(ctx, labels, route, noopNotifyFunc, timeout, marker, logger)

// Create test alerts: one active and one resolved
now := time.Now()
Expand Down Expand Up @@ -833,13 +831,10 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
require.True(t, marker.Active(activeAlert.Fingerprint()))
require.True(t, marker.Active(resolvedAlert.Fingerprint()))

// Create a notify function that succeeds
notifyFunc := func(alerts ...*types.Alert) bool {
return true
}

// Flush the alerts
ag.flush(notifyFunc)
ag.flush(func(alerts ...*types.Alert) bool {
return noopNotifyFunc(ctx, alerts...)
})

// Verify that the resolved alert's marker was deleted
require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist")
Expand All @@ -863,7 +858,7 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
ag := newAggrGroup(ctx, labels, route, noopNotifyFunc, timeout, marker, logger)

// Create a resolved alert
now := time.Now()
Expand Down Expand Up @@ -917,7 +912,7 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
logger := promslog.NewNopLogger()

// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
ag := newAggrGroup(ctx, labels, route, noopNotifyFunc, timeout, marker, logger)

// Create a resolved alert
now := time.Now()
Expand Down
Loading