Skip to content

Commit 8c25deb

Browse files
committed
Add cortex_ruler_rule_groups_in_store metric
Signed-off-by: Emmanuel Lodovice <lodovice@amazon.com>
1 parent 8df8246 commit 8c25deb

File tree

6 files changed

+109
-3
lines changed

6 files changed

+109
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
5050
* [ENHANCEMENT] Query Frontend: Add cortex_query_samples_total metric. #6142
5151
* [ENHANCEMENT] Ingester: Implement metadata API limit. #6128
52+
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
5253
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
5354
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
5455
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018

integration/ruler_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,9 @@ func TestRulerSharding(t *testing.T) {
267267
// between the two rulers.
268268
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
269269
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
270+
// Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values
271+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
272+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
270273

271274
// Fetch the rules and ensure they match the configured ones.
272275
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)

pkg/ruler/manager_metrics.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,39 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
271271
m.RulerQuerySeconds.DeleteLabelValues(userID)
272272
}
273273
}
274+
275+
type RuleGroupMetrics struct {
276+
RuleGroupsInStore *prometheus.GaugeVec
277+
tenants map[string]struct{}
278+
allowedTenants *util.AllowedTenants
279+
}
280+
281+
func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics {
282+
m := &RuleGroupMetrics{
283+
RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
284+
Name: "cortex_ruler_rule_groups_in_store",
285+
Help: "The number of rule groups a tenant has in store.",
286+
}, []string{"user"}),
287+
allowedTenants: allowedTenants,
288+
}
289+
return m
290+
}
291+
292+
// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule
293+
// groups per tenant and removing the metrics for tenants that are not present anymore
294+
func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) {
295+
tenants := make(map[string]struct{}, len(ruleGroupsCount))
296+
for userID, count := range ruleGroupsCount {
297+
if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups
298+
continue
299+
}
300+
tenants[userID] = struct{}{}
301+
r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count))
302+
}
303+
for userID := range r.tenants {
304+
if _, ok := tenants[userID]; !ok {
305+
r.RuleGroupsInStore.DeleteLabelValues(userID)
306+
}
307+
}
308+
r.tenants = tenants
309+
}

pkg/ruler/manager_metrics_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,3 +595,41 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
595595
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
596596
}
597597
}
598+
599+
func TestRuleGroupMetrics(t *testing.T) {
600+
reg := prometheus.NewPedanticRegistry()
601+
m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"}))
602+
m.UpdateRuleGroupsInStore(map[string]int{
603+
"fake1": 10,
604+
"fake2": 20,
605+
"fake3": 30,
606+
})
607+
gm, err := reg.Gather()
608+
require.NoError(t, err)
609+
mfm, err := util.NewMetricFamilyMap(gm)
610+
require.NoError(t, err)
611+
require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
612+
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
613+
"user": "fake1",
614+
}, float64(10))
615+
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{
616+
"user": "fake2",
617+
}, float64(20))
618+
m.UpdateRuleGroupsInStore(map[string]int{
619+
"fake2": 30,
620+
})
621+
gm, err = reg.Gather()
622+
require.NoError(t, err)
623+
mfm, err = util.NewMetricFamilyMap(gm)
624+
require.NoError(t, err)
625+
require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
626+
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
627+
"user": "fake2",
628+
}, float64(30))
629+
m.UpdateRuleGroupsInStore(make(map[string]int))
630+
gm, err = reg.Gather()
631+
require.NoError(t, err)
632+
mfm, err = util.NewMetricFamilyMap(gm)
633+
require.NoError(t, err)
634+
require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"])
635+
}

pkg/ruler/ruler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ type Ruler struct {
294294
ruleGroupStoreLoadDuration prometheus.Gauge
295295
ruleGroupSyncDuration prometheus.Gauge
296296
rulerGetRulesFailures *prometheus.CounterVec
297+
ruleGroupMetrics *RuleGroupMetrics
297298

298299
allowedTenants *util.AllowedTenants
299300

@@ -342,6 +343,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
342343
Help: "The total number of failed rules request sent to rulers in getShardedRules.",
343344
}, []string{"ruler"}),
344345
}
346+
ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants)
345347

346348
if len(cfg.EnabledTenants) > 0 {
347349
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
@@ -667,7 +669,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
667669
if err != nil {
668670
return nil, nil, err
669671
}
672+
ruleGroupCounts := make(map[string]int, len(allRuleGroups))
670673
for userID, groups := range allRuleGroups {
674+
ruleGroupCounts[userID] = len(groups)
671675
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
672676
if len(disabledRuleGroupsForUser) == 0 {
673677
continue
@@ -682,6 +686,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
682686
}
683687
allRuleGroups[userID] = filteredGroupsForUser
684688
}
689+
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
685690
return allRuleGroups, nil, nil
686691
}
687692

@@ -691,9 +696,11 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
691696
return nil, nil, err
692697
}
693698

699+
ruleGroupCounts := make(map[string]int, len(configs))
694700
ownedConfigs := make(map[string]rulespb.RuleGroupList)
695701
backedUpConfigs := make(map[string]rulespb.RuleGroupList)
696702
for userID, groups := range configs {
703+
ruleGroupCounts[userID] = len(groups)
697704
owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
698705
if len(owned) > 0 {
699706
ownedConfigs[userID] = owned
@@ -705,6 +712,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
705712
}
706713
}
707714
}
715+
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
708716
return ownedConfigs, backedUpConfigs, nil
709717
}
710718

@@ -732,6 +740,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
732740
}
733741

734742
if len(userRings) == 0 {
743+
r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int))
735744
return nil, nil, nil
736745
}
737746

@@ -744,6 +753,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
744753
mu := sync.Mutex{}
745754
owned := map[string]rulespb.RuleGroupList{}
746755
backedUp := map[string]rulespb.RuleGroupList{}
756+
gLock := sync.Mutex{}
757+
ruleGroupCounts := make(map[string]int, len(userRings))
747758

748759
concurrency := loadRulesConcurrency
749760
if len(userRings) < concurrency {
@@ -758,6 +769,9 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
758769
if err != nil {
759770
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
760771
}
772+
gLock.Lock()
773+
ruleGroupCounts[userID] = len(groups)
774+
gLock.Unlock()
761775

762776
filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
763777
var filterBackup []*rulespb.RuleGroupDesc
@@ -781,6 +795,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
781795
}
782796

783797
err = g.Wait()
798+
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
784799
return owned, backedUp, err
785800
}
786801

pkg/ruler/ruler_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,23 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
247247
func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler {
248248
ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
249249
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
250+
rgs, err := store.ListAllRuleGroups(context.Background())
251+
require.NoError(t, err)
250252

251-
// Ensure all rules are loaded before usage
252-
ruler.syncRules(context.Background(), rulerSyncReasonInitial)
253-
253+
// Wait to ensure syncRules has finished and all rules are loaded before usage
254+
deadline := time.Now().Add(3 * time.Second)
255+
for {
256+
loaded := true
257+
for tenantId := range rgs {
258+
if len(ruler.manager.GetRules(tenantId)) == 0 {
259+
loaded = false
260+
}
261+
}
262+
if time.Now().After(deadline) || loaded {
263+
break
264+
}
265+
time.Sleep(50 * time.Millisecond)
266+
}
254267
return ruler
255268
}
256269

0 commit comments

Comments
 (0)