Skip to content

Commit 65fbad2

Browse files
authored
Dispatcher groups limits (#4254)
* Expose dispatcher aggregation groups limit. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Expose dispatcher aggregation groups limit reached metric. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * CHANGELOG.md Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Send two alerts per alert group. Signed-off-by: Peter Štibraný <pstibrany@gmail.com> * Simplify option name, add alertmanager limits to list of experimental features. Signed-off-by: Peter Štibraný <pstibrany@gmail.com> * Fix metric name. Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
1 parent ffaef08 commit 65fbad2

File tree

9 files changed

+169
-12
lines changed

9 files changed

+169
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* `memberlist_client_kv_store_value_tombstones`
3939
* `memberlist_client_kv_store_value_tombstones_removed_total`
4040
* `memberlist_client_messages_to_broadcast_dropped_total`
41+
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-dispatcher-aggregation-groups` option to control max number of active dispatcher groups in Alertmanager (per tenant, also overrideable). When the limit is reached, Dispatcher produces log message and increases `cortex_alertmanager_dispatcher_aggregation_group_limit_reached_total` metric. #4254
4142
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
4243
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
4344
* [BUGFIX] Alertmanager: fix Alertmanager status page if clustering via gossip is disabled or sharding is enabled. #4184

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4172,6 +4172,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
41724172
# uploaded via Alertmanager API. 0 = no limit.
41734173
# CLI flag: -alertmanager.max-template-size-bytes
41744174
[alertmanager_max_template_size_bytes: <int> | default = 0]
4175+
4176+
# Maximum number of aggregation groups in Alertmanager's dispatcher that a
4177+
# tenant can have. Each active aggregation group uses single goroutine. When the
4178+
# limit is reached, dispatcher will not dispatch alerts that belong to
4179+
# additional aggregation groups, but existing groups will keep working properly.
4180+
# 0 = no limit.
4181+
# CLI flag: -alertmanager.max-dispatcher-aggregation-groups
4182+
[alertmanager_max_dispatcher_aggregation_groups: <int> | default = 0]
41754183
```
41764184

41774185
### `redis_config`

docs/configuration/v1-guarantees.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@ Currently experimental features are:
7171
- `-ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
7272
- Instance limits in ingester and distributor
7373
- Exemplar storage, currently in-memory only within the Ingester based on Prometheus exemplar storage (`-blocks-storage.tsdb.max-exemplars`)
74-
- Alertmanager: notification rate limits. (`-alertmanager.notification-rate-limit` and `-alertmanager.notification-rate-limit-per-integration`)
7574
- Querier limits:
7675
- `-querier.max-fetched-chunks-per-query`
7776
- `-querier.max-fetched-chunk-bytes-per-query`
7877
- `-querier.max-fetched-series-per-query`
78+
- Alertmanager limits
79+
- notification rate (`-alertmanager.notification-rate-limit` and `-alertmanager.notification-rate-limit-per-integration`)
80+
- dispatcher groups (`-alertmanager.max-dispatcher-aggregation-groups`)
81+
- user config size (`-alertmanager.max-config-size-bytes`)
82+
- templates count in user config (`-alertmanager.max-templates-count`)
83+
- max template size (`-alertmanager.max-template-size-bytes`)

pkg/alertmanager/alertmanager.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
278278
am.mux.Handle(a, http.NotFoundHandler())
279279
}
280280

281-
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, am.registry)
281+
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(true, am.registry)
282282

283283
//TODO: From this point onward, the alertmanager _might_ receive requests - we need to make sure we've settled and are ready.
284284
return am, nil
@@ -382,7 +382,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
382382
pipeline,
383383
am.marker,
384384
timeoutFunc,
385-
nil,
385+
&dispatcherLimits{tenant: am.cfg.UserID, limits: am.cfg.Limits},
386386
log.With(am.logger, "component", "dispatcher"),
387387
am.dispatcherMetrics,
388388
)
@@ -575,3 +575,12 @@ func (t *tenantRateLimits) RateLimit() rate.Limit {
575575
func (t *tenantRateLimits) Burst() int {
576576
return t.limits.NotificationBurstSize(t.tenant, t.integration)
577577
}
578+
579+
type dispatcherLimits struct {
580+
tenant string
581+
limits Limits
582+
}
583+
584+
func (g *dispatcherLimits) MaxNumberOfAggregationGroups() int {
585+
return g.limits.AlertmanagerMaxDispatcherAggregationGroups(g.tenant)
586+
}

pkg/alertmanager/alertmanager_metrics.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ type alertmanagerMetrics struct {
5959
persistTotal *prometheus.Desc
6060
persistFailed *prometheus.Desc
6161

62-
notificationRateLimited *prometheus.Desc
62+
notificationRateLimited *prometheus.Desc
63+
dispatcherAggregationGroupsLimitReached *prometheus.Desc
6364
}
6465

6566
func newAlertmanagerMetrics() *alertmanagerMetrics {
@@ -209,6 +210,10 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
209210
"cortex_alertmanager_notification_rate_limited_total",
210211
"Total number of rate-limited notifications per integration.",
211212
[]string{"user", "integration"}, nil),
213+
dispatcherAggregationGroupsLimitReached: prometheus.NewDesc(
214+
"cortex_alertmanager_dispatcher_aggregation_group_limit_reached_total",
215+
"Number of times when dispatcher failed to create new aggregation group due to limit.",
216+
[]string{"user"}, nil),
212217
}
213218
}
214219

@@ -259,6 +264,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
259264
out <- m.persistTotal
260265
out <- m.persistFailed
261266
out <- m.notificationRateLimited
267+
out <- m.dispatcherAggregationGroupsLimitReached
262268
}
263269

264270
func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
@@ -306,4 +312,5 @@ func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
306312
data.SendSumOfCounters(out, m.persistFailed, "alertmanager_state_persist_failed_total")
307313

308314
data.SendSumOfCountersPerUserWithLabels(out, m.notificationRateLimited, "alertmanager_notification_rate_limited_total", "integration")
315+
data.SendSumOfCountersPerUser(out, m.dispatcherAggregationGroupsLimitReached, "alertmanager_dispatcher_aggregation_group_limit_reached_total")
309316
}

pkg/alertmanager/alertmanager_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package alertmanager
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/kit/log"
11+
"github.com/prometheus/alertmanager/config"
12+
"github.com/prometheus/alertmanager/types"
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/prometheus/client_golang/prometheus/testutil"
15+
"github.com/prometheus/common/model"
16+
"github.com/stretchr/testify/require"
17+
18+
"github.com/cortexproject/cortex/pkg/util/test"
19+
)
20+
21+
func TestDispatcherGroupLimits(t *testing.T) {
22+
for name, tc := range map[string]struct {
23+
groups int
24+
groupsLimit int
25+
expectedFailures int
26+
}{
27+
"no limit": {groups: 5, groupsLimit: 0, expectedFailures: 0},
28+
"high limit": {groups: 5, groupsLimit: 10, expectedFailures: 0},
29+
"low limit": {groups: 5, groupsLimit: 3, expectedFailures: 4}, // 2 groups that fail, 2 alerts per group = 4 failures
30+
} {
31+
t.Run(name, func(t *testing.T) {
32+
createAlertmanagerAndSendAlerts(t, tc.groups, tc.groupsLimit, tc.expectedFailures)
33+
})
34+
}
35+
}
36+
37+
func createAlertmanagerAndSendAlerts(t *testing.T, alertGroups, groupsLimit, expectedFailures int) {
38+
user := "test"
39+
40+
reg := prometheus.NewPedanticRegistry()
41+
am, err := New(&Config{
42+
UserID: user,
43+
Logger: log.NewNopLogger(),
44+
Limits: &mockAlertManagerLimits{maxDispatcherAggregationGroups: groupsLimit},
45+
TenantDataDir: t.TempDir(),
46+
ExternalURL: &url.URL{Path: "/am"},
47+
ShardingEnabled: false,
48+
}, reg)
49+
require.NoError(t, err)
50+
defer am.StopAndWait()
51+
52+
cfgRaw := `receivers:
53+
- name: 'prod'
54+
55+
route:
56+
group_by: ['alertname']
57+
group_wait: 10ms
58+
group_interval: 10ms
59+
receiver: 'prod'`
60+
61+
cfg, err := config.Load(cfgRaw)
62+
require.NoError(t, err)
63+
require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))
64+
65+
now := time.Now()
66+
67+
for i := 0; i < alertGroups; i++ {
68+
alertName := model.LabelValue(fmt.Sprintf("Alert-%d", i))
69+
70+
inputAlerts := []*types.Alert{
71+
{
72+
Alert: model.Alert{
73+
Labels: model.LabelSet{
74+
"alertname": alertName,
75+
"a": "b",
76+
},
77+
Annotations: model.LabelSet{"foo": "bar"},
78+
StartsAt: now,
79+
EndsAt: now.Add(5 * time.Minute),
80+
GeneratorURL: "http://example.com/prometheus",
81+
},
82+
UpdatedAt: now,
83+
Timeout: false,
84+
},
85+
86+
{
87+
Alert: model.Alert{
88+
Labels: model.LabelSet{
89+
"alertname": alertName,
90+
"z": "y",
91+
},
92+
Annotations: model.LabelSet{"foo": "bar"},
93+
StartsAt: now,
94+
EndsAt: now.Add(5 * time.Minute),
95+
GeneratorURL: "http://example.com/prometheus",
96+
},
97+
UpdatedAt: now,
98+
Timeout: false,
99+
},
100+
}
101+
require.NoError(t, am.alerts.Put(inputAlerts...))
102+
}
103+
104+
// Give it some time, as alerts are sent to dispatcher asynchronously.
105+
test.Poll(t, 3*time.Second, nil, func() interface{} {
106+
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
107+
# HELP alertmanager_dispatcher_aggregation_group_limit_reached_total Number of times when dispatcher failed to create new aggregation group due to limit.
108+
# TYPE alertmanager_dispatcher_aggregation_group_limit_reached_total counter
109+
alertmanager_dispatcher_aggregation_group_limit_reached_total %d
110+
`, expectedFailures)), "alertmanager_dispatcher_aggregation_group_limit_reached_total")
111+
})
112+
}

pkg/alertmanager/multitenant.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ type Limits interface {
219219

220220
// AlertmanagerMaxTemplateSize returns max size of individual template. 0 = no limit.
221221
AlertmanagerMaxTemplateSize(tenant string) int
222+
223+
// AlertmanagerMaxNumberOfDispatcherAggregationGroups returns maximum number of aggregation groups in Alertmanager's dispatcher that a tenant can have.
224+
// Each aggregation group consumes single goroutine. 0 = unlimited.
225+
AlertmanagerMaxDispatcherAggregationGroups(t string) int
222226
}
223227

224228
// A MultitenantAlertmanager manages Alertmanager instances for multiple

pkg/alertmanager/multitenant_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2019,11 +2019,12 @@ func (f *passthroughAlertmanagerClientPool) GetClientFor(addr string) (Client, e
20192019
}
20202020

20212021
type mockAlertManagerLimits struct {
2022-
emailNotificationRateLimit rate.Limit
2023-
emailNotificationBurst int
2024-
maxConfigSize int
2025-
maxTemplatesCount int
2026-
maxSizeOfTemplate int
2022+
emailNotificationRateLimit rate.Limit
2023+
emailNotificationBurst int
2024+
maxConfigSize int
2025+
maxTemplatesCount int
2026+
maxSizeOfTemplate int
2027+
maxDispatcherAggregationGroups int
20272028
}
20282029

20292030
func (m *mockAlertManagerLimits) AlertmanagerMaxConfigSize(tenant string) int {
@@ -2053,3 +2054,7 @@ func (m *mockAlertManagerLimits) NotificationRateLimit(_ string, integration str
20532054
func (m *mockAlertManagerLimits) NotificationBurstSize(_ string, integration string) int {
20542055
return m.emailNotificationBurst
20552056
}
2057+
2058+
func (m *mockAlertManagerLimits) AlertmanagerMaxDispatcherAggregationGroups(_ string) int {
2059+
return m.maxDispatcherAggregationGroups
2060+
}

pkg/util/validation/limits.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ type Limits struct {
106106
NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"`
107107
NotificationRateLimitPerIntegration NotificationRateLimitMap `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"`
108108

109-
AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"`
110-
AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"`
111-
AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"`
109+
AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"`
110+
AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"`
111+
AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"`
112+
AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"`
112113
}
113114

114115
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -181,6 +182,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
181182
f.IntVar(&l.AlertmanagerMaxConfigSizeBytes, "alertmanager.max-config-size-bytes", 0, "Maximum size of configuration file for Alertmanager that tenant can upload via Alertmanager API. 0 = no limit.")
182183
f.IntVar(&l.AlertmanagerMaxTemplatesCount, "alertmanager.max-templates-count", 0, "Maximum number of templates in tenant's Alertmanager configuration uploaded via Alertmanager API. 0 = no limit.")
183184
f.IntVar(&l.AlertmanagerMaxTemplateSizeBytes, "alertmanager.max-template-size-bytes", 0, "Maximum size of single template in tenant's Alertmanager configuration uploaded via Alertmanager API. 0 = no limit.")
185+
f.IntVar(&l.AlertmanagerMaxDispatcherAggregationGroups, "alertmanager.max-dispatcher-aggregation-groups", 0, "Maximum number of aggregation groups in Alertmanager's dispatcher that a tenant can have. Each active aggregation group uses single goroutine. When the limit is reached, dispatcher will not dispatch alerts that belong to additional aggregation groups, but existing groups will keep working properly. 0 = no limit.")
184186
}
185187

186188
// Validate the limits config and returns an error if the validation
@@ -605,6 +607,10 @@ func (o *Overrides) AlertmanagerMaxTemplateSize(userID string) int {
605607
return o.getOverridesForUser(userID).AlertmanagerMaxTemplateSizeBytes
606608
}
607609

610+
func (o *Overrides) AlertmanagerMaxDispatcherAggregationGroups(userID string) int {
611+
return o.getOverridesForUser(userID).AlertmanagerMaxDispatcherAggregationGroups
612+
}
613+
608614
func (o *Overrides) getOverridesForUser(userID string) *Limits {
609615
if o.tenantLimits != nil {
610616
l := o.tenantLimits.ByUserID(userID)

0 commit comments

Comments
 (0)