Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
* [FEATURE] ...
* [ENHANCEMENT] ...

## 0.32.2 / 2026-05-25

* [BUGFIX] Fix dispatcher goroutine leaks on destroyed alertgroup swap. #5241

## 0.32.1 / 2026-04-29

* [BUGFIX] dispatcher: Fix issue with dispatching to a contended route. #5179
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.32.1
0.32.2
15 changes: 14 additions & 1 deletion dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,14 @@ func (d *Dispatcher) doMaintenance() {
ag := el.(*aggrGroup)
if ag.destroyed() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
deleted := d.routeGroupsSlice[i].groups.CompareAndDelete(ag.fingerprint(), ag)
if deleted {
// TODO(ultrotter, siavash):
// Deletion from the marker should only happen if we really deleted the group.
// Fully fixing the case where a new group with the same fingerprint is created between
// CompareAndDelete and DeleteByGroupKey would require changes to the marker interface,
// so we leave it as a fix for after landing the pending marker changes.
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
d.routeGroupsSlice[i].groupsLen.Add(-1)
d.aggrGroupsNum.Add(-1)
d.metrics.aggrGroups.Set(float64(d.aggrGroupsNum.Load()))
Expand Down Expand Up @@ -516,6 +521,9 @@ func (d *Dispatcher) groupAlert(ctx context.Context, alert *types.Alert, route *
// Try to store the new group in the map. If another goroutine has already created the same group, use the existing one.
swapped := d.routeGroupsSlice[route.Idx].groups.CompareAndSwap(fp, el, ag)
if swapped {
// Since we swapped the new group in, we need to cancel the old one,
// as doMaintenance will not be able to find it in the map anymore.
el.(*aggrGroup).cancel()
// We swapped the new group in, we can break and start it.
break
}
Expand Down Expand Up @@ -735,6 +743,11 @@ func (ag *aggrGroup) run(nf notifyFunc) {

cancel()

// If destroyed, exit: this particular alert group won't be used anymore.
if ag.destroyed() {
return
}

case <-ag.ctx.Done():
return
}
Expand Down
60 changes: 60 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,66 @@ func TestGroupAlert_RecoversWhenCASFails(t *testing.T) {
require.Positive(t, testutil.ToFloat64(metrics.aggrGroupCreationRetries), "contended CAS path was not exercised in %d rounds — scheduler is unusually serial", rounds)
}

// TestGroupAlert_DisplacedAggrGroupGoroutineExits is a regression test for a
// goroutine leak: when groupAlert CAS-replaces a destroyed aggrGroup in the
// map, the displaced group's run goroutine must be torn down. Otherwise it
// stays parked in its select forever (doMaintenance can no longer find it
// because it's been removed from the map), accumulating one stuck goroutine
// per replacement for the lifetime of the process.
func TestGroupAlert_DisplacedAggrGroupGoroutineExits(t *testing.T) {
logger := promslog.NewNopLogger()
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
require.NoError(t, err)
defer alerts.Close()

route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: time.Hour, // never auto-flush during the test
GroupInterval: time.Hour,
RepeatInterval: time.Hour,
},
Idx: 0,
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
dispatcher.routeGroupsSlice = []routeAggrGroups{{route: route}}
// WaitingToStart so groupAlert won't auto-start the new ag — keeps the
// test focused on the displaced group.
dispatcher.state.Store(DispatcherStateWaitingToStart)

groupLabels := model.LabelSet{"alertname": "displaced"}
displaced := newAggrGroup(context.Background(), groupLabels, route, timeout, marker, logger)
// Mark destroyed so groupAlert can't insert into it and is forced down
// the CAS-replace path.
require.NoError(t, displaced.alerts.DeleteIfNotModified(types.AlertSlice{}, true))
require.True(t, displaced.destroyed())
dispatcher.routeGroupsSlice[0].groups.Store(displaced.fingerprint(), displaced)

// Start the run goroutine on the displaced group — this is the orphan
// candidate. Without the fix it would never exit.
go displaced.run(func(context.Context, ...*types.Alert) bool { return true })

// Trigger the CAS replacement.
dispatcher.groupAlert(context.Background(), newAlert(groupLabels), route)

// The displaced group should have been swapped out for a fresh one.
el, ok := dispatcher.routeGroupsSlice[0].groups.Load(displaced.fingerprint())
require.True(t, ok)
require.NotSame(t, displaced, el.(*aggrGroup), "destroyed group must have been replaced")

// And its run goroutine must have exited.
select {
case <-displaced.done:
case <-time.After(2 * time.Second):
t.Fatal("displaced aggrGroup.run goroutine did not exit after CAS replacement")
}
}

func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) {
ctx := context.Background()
Expand Down
Loading