Skip to content

Commit f7685e1

Browse files
committed
fix(dispatch): reduce locking contention
Reduce the amount of time spent holding locks in the dispatcher: - doMaintenance() holds RW lock only during deletion - Groups() holds R lock, copies, releases - processAlert() holds R(W) locks only when necessary This results in -70% maintenance overhead or +19518.99% alert processing rate improvement: ``` │ bench-dispatch-main.txt │ bench-dispatch-fix-locking.txt │ │ baseline_alerts/sec │ baseline_alerts/sec vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 1.899M ± 1% 1.919M ± 2% ~ (p=0.063 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 1.934M ± 2% 1.967M ± 1% +1.69% (p=0.015 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 1.926M ± 3% 1.931M ± 4% ~ (p=0.436 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 2.087M ± 10% 2.030M ± 7% ~ (p=0.912 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 1.922M ± 3% 2.118M ± 10% +10.16% (p=0.000 n=10) Dispatch_20k_AggregationGroups_Groups_Impact-12 180.7k ± 42% 2128.6k ± 4% +1077.93% (p=0.000 n=10) Dispatch_50k_AggregationGroups_Groups_Impact-12 57.02k ± 196% 2089.55k ± 2% +3564.69% (p=0.000 n=10) Dispatch_100k_AggregationGroups_Groups_Impact-12 19.61k ± 35% 1899.27k ± 2% +9582.74% (p=0.000 n=10) geomean 524.6k 2.008M +282.84% │ bench-dispatch-main.txt │ bench-dispatch-fix-locking.txt │ │ maintenance_overhead_% │ maintenance_overhead_% vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 17.185 ± 7% 5.672 ± 45% -66.99% (p=0.000 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 36.50 ± 11% 12.56 ± 14% -65.59% (p=0.000 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 55.44 ± 13% 23.11 ± 30% -58.32% (p=0.000 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 125.65 ± 27% 28.98 ± 53% -76.94% (p=0.000 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 172.40 ± 36% 37.76 ± 82% -78.10% (p=0.000 n=10) geomean 59.62 17.83 -70.10% ``` Signed-off-by: Siavash Safi <siavash@cloudflare.com>
1 parent 3e70148 commit f7685e1

File tree

2 files changed

+448
-10
lines changed

2 files changed

+448
-10
lines changed

dispatch/dispatch.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"log/slog"
21+
"maps"
2122
"sort"
2223
"sync"
2324
"time"
@@ -185,19 +186,32 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
185186
}
186187

187188
func (d *Dispatcher) doMaintenance() {
189+
d.mtx.RLock()
190+
empty := make(map[*Route]map[model.Fingerprint]*aggrGroup)
191+
for route, groups := range d.aggrGroupsPerRoute {
192+
for fp, ag := range groups {
193+
if ag.empty() {
194+
if empty[route] == nil {
195+
empty[route] = make(map[model.Fingerprint]*aggrGroup)
196+
}
197+
empty[route][fp] = ag
198+
}
199+
}
200+
}
201+
d.mtx.RUnlock()
188202
d.mtx.Lock()
189-
defer d.mtx.Unlock()
190-
for _, groups := range d.aggrGroupsPerRoute {
191-
for _, ag := range groups {
203+
for route, groups := range empty {
204+
for fp, ag := range groups {
192205
if ag.empty() {
193206
ag.stop()
194207
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
195-
delete(groups, ag.fingerprint())
208+
delete(d.aggrGroupsPerRoute[route], fp)
196209
d.aggrGroupsNum--
197210
d.metrics.aggrGroups.Dec()
198211
}
199212
}
200213
}
214+
d.mtx.Unlock()
201215
}
202216

203217
// AlertGroup represents how alerts exist within an aggrGroup.
@@ -224,16 +238,19 @@ func (ag AlertGroups) Len() int { return len(ag) }
224238
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
225239
groups := AlertGroups{}
226240

241+
// Take a snapshot copy of the map to avoid blocking alert processing.
242+
snapshot := make(map[*Route]map[model.Fingerprint]*aggrGroup)
227243
d.mtx.RLock()
228-
defer d.mtx.RUnlock()
244+
maps.Copy(d.aggrGroupsPerRoute, snapshot)
245+
d.mtx.RUnlock()
229246

230247
// Keep a list of receivers for an alert to prevent checking each alert
231248
// again against all routes. The alert has already matched against this
232249
// route on ingestion.
233250
receivers := map[model.Fingerprint][]string{}
234251

235252
now := time.Now()
236-
for route, ags := range d.aggrGroupsPerRoute {
253+
for route, ags := range snapshot {
237254
if !routeFilter(route) {
238255
continue
239256
}
@@ -316,30 +333,34 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
316333
fp := groupLabels.Fingerprint()
317334

318335
d.mtx.Lock()
319-
defer d.mtx.Unlock()
320-
321336
routeGroups, ok := d.aggrGroupsPerRoute[route]
322337
if !ok {
323338
routeGroups = map[model.Fingerprint]*aggrGroup{}
324339
d.aggrGroupsPerRoute[route] = routeGroups
325340
}
326341

327342
ag, ok := routeGroups[fp]
343+
d.mtx.Unlock()
328344
if ok {
329345
ag.insert(alert)
330346
return
331347
}
332348

333349
// If the group does not exist, create it. But check the limit first.
334-
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
350+
d.mtx.RLock()
351+
aggrGroupsNum := d.aggrGroupsNum
352+
d.mtx.RUnlock()
353+
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && aggrGroupsNum >= limit {
335354
d.metrics.aggrGroupLimitReached.Inc()
336-
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
355+
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", aggrGroupsNum, "limit", limit, "alert", alert.Name())
337356
return
338357
}
339358

340359
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
360+
d.mtx.Lock()
341361
routeGroups[fp] = ag
342362
d.aggrGroupsNum++
363+
d.mtx.Unlock()
343364
d.metrics.aggrGroups.Inc()
344365

345366
// Insert the 1st alert in the group before starting the group's run()

0 commit comments

Comments
 (0)