Skip to content

Commit 6fa6f92

Browse files
disable rule groups
Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
1 parent 94192ff commit 6fa6f92

File tree

8 files changed

+474
-16
lines changed

8 files changed

+474
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [FEATURE] Ruler: Add support for disabling rule groups. #5521
45
* [FEATURE] Added the flag `-alertmanager.alerts-gc-interval` to configure alert manager alerts Garbage collection interval. #5550
56
* [FEATURE] Ruler: Add support for Limit field on RuleGroup. #5528
67
* [FEATURE] AlertManager: Add support for Webex, Discord and Telegram Receiver. #5493

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3097,6 +3097,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
30973097
# alerts will fail with a log message and metric increment. 0 = no limit.
30983098
# CLI flag: -alertmanager.max-alerts-size-bytes
30993099
[alertmanager_max_alerts_size_bytes: <int> | default = 0]
3100+
3101+
# list of rule groups to disable
3102+
[disabled_rule_groups: <list of rule groups to disable> | default = ]
31003103
```
31013104
31023105
### `memberlist_config`

integration/ruler_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package integration
55

66
import (
7+
"bytes"
78
"context"
89
"crypto/x509"
910
"crypto/x509/pkix"
@@ -29,6 +30,7 @@ import (
2930
"github.com/prometheus/prometheus/prompb"
3031
"github.com/stretchr/testify/assert"
3132
"github.com/stretchr/testify/require"
33+
"github.com/thanos-io/objstore/providers/s3"
3234
"gopkg.in/yaml.v3"
3335

3436
"github.com/cortexproject/cortex/integration/ca"
@@ -915,6 +917,127 @@ func TestRulerMetricsWhenIngesterFails(t *testing.T) {
915917
})
916918
}
917919

920+
func TestRulerDisablesRuleGroups(t *testing.T) {
921+
s, err := e2e.NewScenario(networkName)
922+
require.NoError(t, err)
923+
defer s.Close()
924+
925+
// Start dependencies.
926+
consul := e2edb.NewConsul()
927+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
928+
require.NoError(t, s.StartAndWaitReady(consul, minio))
929+
930+
const blockRangePeriod = 2 * time.Second
931+
// Configure the ruler.
932+
flags := mergeFlags(
933+
BlocksStorageFlags(),
934+
RulerFlags(),
935+
map[string]string{
936+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
937+
"-blocks-storage.tsdb.ship-interval": "1s",
938+
"-blocks-storage.bucket-store.sync-interval": "1s",
939+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
940+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
941+
942+
// Enable the bucket index so we can skip the initial bucket scan.
943+
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
944+
// Evaluate rules often, so that we don't need to wait for metrics to show up.
945+
"-ruler.evaluation-interval": "2s",
946+
"-ruler.poll-interval": "2s",
947+
// No delay
948+
"-ruler.evaluation-delay-duration": "0",
949+
950+
// We run single ingester only, no replication.
951+
"-distributor.replication-factor": "1",
952+
953+
// Very low limit so that ruler hits it.
954+
"-querier.max-fetched-chunks-per-query": "15",
955+
"-querier.query-store-after": (1 * time.Second).String(),
956+
"-querier.query-ingesters-within": (2 * time.Second).String(),
957+
},
958+
)
959+
960+
const namespace = "test"
961+
const user = "user"
962+
configFileName := "runtime-config.yaml"
963+
bucketName := "cortex"
964+
965+
storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
966+
967+
flags = mergeFlags(flags, map[string]string{
968+
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
969+
"-runtime-config.backend": "s3",
970+
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
971+
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
972+
"-runtime-config.s3.bucket-name": bucketName,
973+
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
974+
"-runtime-config.s3.insecure": "true",
975+
"-runtime-config.file": configFileName,
976+
"-runtime-config.reload-period": "2s",
977+
})
978+
979+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
980+
981+
client, err := s3.NewBucketWithConfig(nil, s3.Config{
982+
Endpoint: minio.HTTPEndpoint(),
983+
Insecure: true,
984+
Bucket: bucketName,
985+
AccessKey: e2edb.MinioAccessKey,
986+
SecretKey: e2edb.MinioSecretKey,
987+
}, "runtime-config-test")
988+
989+
require.NoError(t, err)
990+
991+
// update runtime config
992+
newRuntimeConfig := []byte(`overrides:
993+
user:
994+
disabled_rule_groups:
995+
- name: bad_rule
996+
namespace: test`)
997+
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
998+
time.Sleep(2 * time.Second)
999+
1000+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
1001+
1002+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1003+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway))
1004+
1005+
// Wait until both the distributor and ruler have updated the ring. The querier will also watch
1006+
// the store-gateway ring if blocks sharding is enabled.
1007+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1008+
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))
1009+
1010+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
1011+
require.NoError(t, err)
1012+
1013+
expression := "absent(sum_over_time(metric{}[2s] offset 1h))"
1014+
1015+
t.Run("disable_rule_group", func(t *testing.T) {
1016+
1017+
ruleGroup := ruleGroupWithRule("bad_rule", "rule", expression)
1018+
ruleGroup.Interval = 2
1019+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))
1020+
1021+
ruleGroup = ruleGroupWithRule("good_rule", "rule", expression)
1022+
ruleGroup.Interval = 2
1023+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))
1024+
1025+
m1 := ruleGroupMatcher(user, namespace, "good_rule")
1026+
1027+
// Wait until ruler has loaded the group.
1028+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_sync_rules_total"}, e2e.WaitMissingMetrics))
1029+
1030+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics))
1031+
1032+
filter := e2ecortex.RuleFilter{}
1033+
actualGroups, err := c.GetPrometheusRules(filter)
1034+
require.NoError(t, err)
1035+
assert.Equal(t, 1, len(actualGroups))
1036+
assert.Equal(t, "good_rule", actualGroups[0].Name)
1037+
assert.Equal(t, "test", actualGroups[0].File)
1038+
})
1039+
}
1040+
9181041
func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
9191042
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
9201043
}

pkg/ruler/compat.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"time"
77

8+
"github.com/cortexproject/cortex/pkg/util/validation"
9+
810
"github.com/go-kit/log"
911
"github.com/go-kit/log/level"
1012
"github.com/prometheus/client_golang/prometheus"
@@ -142,6 +144,7 @@ type RulesLimits interface {
142144
RulerTenantShardSize(userID string) int
143145
RulerMaxRuleGroupsPerTenant(userID string) int
144146
RulerMaxRulesPerRuleGroup(userID string) int
147+
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
145148
}
146149

147150
// EngineQueryFunc returns a new engine query function by passing an altered timestamp.

pkg/ruler/ruler.go

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ const (
7171
recordingRuleFilter string = "record"
7272
)
7373

74+
type DisabledRuleGroupErr struct {
75+
Message string
76+
}
77+
78+
func (e *DisabledRuleGroupErr) Error() string {
79+
return e.Message
80+
}
81+
7482
// Config is the configuration for the recording rules server.
7583
type Config struct {
7684
// This is used for template expansion in alerts; must be a valid URL.
@@ -400,6 +408,17 @@ func SendAlerts(n sender, externalURL string) promRules.NotifyFunc {
400408
}
401409
}
402410

411+
func ruleGroupDisabled(ruleGroup *rulespb.RuleGroupDesc, disabledRuleGroupsForUser validation.DisabledRuleGroups) bool {
412+
for _, disabledRuleGroupForUser := range disabledRuleGroupsForUser {
413+
if ruleGroup.Namespace == disabledRuleGroupForUser.Namespace &&
414+
ruleGroup.Name == disabledRuleGroupForUser.Name &&
415+
ruleGroup.User == disabledRuleGroupForUser.User {
416+
return true
417+
}
418+
}
419+
return false
420+
}
421+
403422
var sep = []byte("/")
404423

405424
func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
@@ -415,15 +434,21 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
415434
return ringHasher.Sum32()
416435
}
417436

418-
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) {
437+
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) {
438+
419439
hash := tokenForGroup(g)
420440

421441
rlrs, err := r.Get(hash, RingOp, nil, nil, nil)
422442
if err != nil {
423443
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
424444
}
425445

426-
return rlrs.Instances[0].Addr == instanceAddr, nil
446+
ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr
447+
if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) {
448+
return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)}
449+
}
450+
451+
return ownsRuleGroup, nil
427452
}
428453

429454
func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -533,7 +558,26 @@ func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGr
533558
}
534559

535560
func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
536-
return r.store.ListAllRuleGroups(ctx)
561+
allRuleGroups, err := r.store.ListAllRuleGroups(ctx)
562+
if err != nil {
563+
return nil, err
564+
}
565+
for userID, groups := range allRuleGroups {
566+
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
567+
if len(disabledRuleGroupsForUser) == 0 {
568+
continue
569+
}
570+
filteredGroupsForUser := rulespb.RuleGroupList{}
571+
for _, group := range groups {
572+
if !ruleGroupDisabled(group, disabledRuleGroupsForUser) {
573+
filteredGroupsForUser = append(filteredGroupsForUser, group)
574+
} else {
575+
level.Info(r.logger).Log("msg", "rule group disabled", "name", group.Name, "namespace", group.Namespace, "user", group.User)
576+
}
577+
}
578+
allRuleGroups[userID] = filteredGroupsForUser
579+
}
580+
return allRuleGroups, nil
537581
}
538582

539583
func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
@@ -544,7 +588,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
544588

545589
filteredConfigs := make(map[string]rulespb.RuleGroupList)
546590
for userID, groups := range configs {
547-
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
591+
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
548592
if len(filtered) > 0 {
549593
filteredConfigs[userID] = filtered
550594
}
@@ -602,7 +646,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
602646
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
603647
}
604648

605-
filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
649+
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
606650
if len(filtered) == 0 {
607651
continue
608652
}
@@ -624,15 +668,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
624668
//
625669
// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring,
626670
// but only ring passed as parameter.
627-
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
671+
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
628672
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
629673
var result []*rulespb.RuleGroupDesc
630674
for _, g := range ruleGroups {
631-
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
675+
owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr)
632676
if err != nil {
633-
ringCheckErrors.Inc()
634-
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
635-
continue
677+
switch e := err.(type) {
678+
case *DisabledRuleGroupErr:
679+
level.Info(log).Log("msg", e.Message)
680+
continue
681+
default:
682+
ringCheckErrors.Inc()
683+
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
684+
continue
685+
}
636686
}
637687

638688
if owned {

0 commit comments

Comments
 (0)