Skip to content

Remove user specific evaluation metrics when rule manager is removed #5772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735
* [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714
* [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
16 changes: 8 additions & 8 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
}

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
var totalQueries = []float64{0}

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
Expand Down Expand Up @@ -769,19 +768,20 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Check that cortex_ruler_queries_total went up
totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, totalQueries[0], float64(0))

// Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test.
require.NoError(t, c.DeleteRuleGroup(namespace, groupName))

// Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears).
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

// Remember totalQueries for next test.
totalQueries = newTotalQueries
// Deleting the rule group should clean up the cortex_ruler_queries_total metrics
_, err = ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.EqualError(t, err, "metric=cortex_ruler_queries_total service=ruler: metric not found")
})
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)

if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -577,15 +578,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
queryEngine = promql.NewEngine(opts)
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
Expand Down
40 changes: 7 additions & 33 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -279,47 +278,22 @@ type RulesManager interface {
// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory {
totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
}, []string{"user"})
failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
}, []string{"user"})

totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
}, []string{"user"})
failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
}, []string{"user"})
var rulerQuerySeconds *prometheus.CounterVec
if cfg.EnableQueryStats {
rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_query_seconds_total",
Help: "Total amount of wall clock time spent processing queries by the ruler.",
}, []string{"user"})
}

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
// Errors from PromQL are always "user" errors.
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)

return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
var queryTime prometheus.Counter
if rulerQuerySeconds != nil {
queryTime = rulerQuerySeconds.WithLabelValues(userID)
if evalMetrics.RulerQuerySeconds != nil {
queryTime = evalMetrics.RulerQuerySeconds.WithLabelValues(userID)
}

failedQueries := failedQueriesVec.WithLabelValues(userID)
totalQueries := totalQueriesVec.WithLabelValues(userID)
totalWrites := totalWritesVec.WithLabelValues(userID)
failedWrites := failedWritesVec.WithLabelValues(userID)
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)

engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID)
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
Expand Down
13 changes: 9 additions & 4 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
)

type DefaultMultiTenantManager struct {
cfg Config
notifierCfg *config.Config
managerFactory ManagerFactory
cfg Config
notifierCfg *config.Config
managerFactory ManagerFactory
ruleEvalMetrics *RuleEvalMetrics

mapper *mapper

Expand All @@ -51,7 +52,7 @@ type DefaultMultiTenantManager struct {
logger log.Logger
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
Expand All @@ -78,6 +79,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg
cfg: cfg,
notifierCfg: ncfg,
managerFactory: managerFactory,
ruleEvalMetrics: evalMetrics,
notifiers: map[string]*rulerNotifier{},
notifiersDiscoveryMetrics: notifiersDiscoveryMetrics,
mapper: newMapper(cfg.RulePath, logger),
Expand Down Expand Up @@ -130,6 +132,9 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
if r.ruleEvalMetrics != nil {
r.ruleEvalMetrics.deletePerUserMetrics(userID)
}
level.Info(r.logger).Log("msg", "deleted rule manager and local rule files", "user", userID)
}
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ruler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -222,3 +223,51 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerUser(out, m.NotificationQueueCapacity, "prometheus_notifications_queue_capacity")
data.SendSumOfGaugesPerUser(out, m.AlertmanagersDiscovered, "prometheus_notifications_alertmanagers_discovered")
}

type RuleEvalMetrics struct {
TotalWritesVec *prometheus.CounterVec
FailedWritesVec *prometheus.CounterVec
TotalQueriesVec *prometheus.CounterVec
FailedQueriesVec *prometheus.CounterVec
RulerQuerySeconds *prometheus.CounterVec
}

func NewRuleEvalMetrics(cfg Config, reg prometheus.Registerer) *RuleEvalMetrics {
m := &RuleEvalMetrics{
TotalWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
}, []string{"user"}),
FailedWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
}, []string{"user"}),
TotalQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
}, []string{"user"}),
FailedQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
}, []string{"user"}),
}
if cfg.EnableQueryStats {
m.RulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_query_seconds_total",
Help: "Total amount of wall clock time spent processing queries by the ruler.",
}, []string{"user"})
}

return m
}

func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.TotalWritesVec.DeleteLabelValues(userID)
m.FailedWritesVec.DeleteLabelValues(userID)
m.TotalQueriesVec.DeleteLabelValues(userID)
m.FailedQueriesVec.DeleteLabelValues(userID)

if m.RulerQuerySeconds != nil {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}
39 changes: 39 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util"
)

func TestManagerMetricsWithRuleGroupLabel(t *testing.T) {
Expand Down Expand Up @@ -556,3 +558,40 @@ func TestMetricsArePerUser(t *testing.T) {
assert.True(t, foundUserLabel, "user label not found for metric %s", desc.String())
}
}

func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()

m := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m.TotalWritesVec.WithLabelValues("fake1").Add(10)
m.TotalWritesVec.WithLabelValues("fake2").Add(10)
m.FailedWritesVec.WithLabelValues("fake1").Add(10)
m.FailedWritesVec.WithLabelValues("fake2").Add(10)
m.TotalQueriesVec.WithLabelValues("fake1").Add(10)
m.TotalQueriesVec.WithLabelValues("fake2").Add(10)
m.FailedQueriesVec.WithLabelValues("fake1").Add(10)
m.FailedQueriesVec.WithLabelValues("fake2").Add(10)
m.RulerQuerySeconds.WithLabelValues("fake1").Add(10)
m.RulerQuerySeconds.WithLabelValues("fake2").Add(10)

metricNames := []string{"cortex_ruler_write_requests_total", "cortex_ruler_write_requests_failed_total", "cortex_ruler_queries_total", "cortex_ruler_queries_failed_total", "cortex_ruler_query_seconds_total"}
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
for _, name := range metricNames {
require.Contains(t, mfm[name].String(), "value:\"fake1\"")
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}

m.deletePerUserMetrics("fake1")
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
for _, name := range metricNames {
require.NotContains(t, mfm[name].String(), "value:\"fake1\"")
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}
44 changes: 43 additions & 1 deletion pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/test"
)

func TestSyncRuleGroups(t *testing.T) {
dir := t.TempDir()

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -96,6 +97,47 @@ func TestSyncRuleGroups(t *testing.T) {
})
}

func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"

evalMetrics.TotalWritesVec.WithLabelValues(user).Add(10)

userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}
m.SyncRuleGroups(context.Background(), userRules)
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Contains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"")
require.Contains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")

// Passing empty map / nil stops all managers.
m.SyncRuleGroups(context.Background(), nil)
require.Nil(t, getManager(m, user))

gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.NotContains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"")
require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.Lock()
defer m.userManagerMtx.Unlock()
Expand Down
10 changes: 6 additions & 4 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Eng

func newManager(t *testing.T, cfg Config) *DefaultMultiTenantManager {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, nil)
manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger)
metrics := NewRuleEvalMetrics(cfg, nil)
managerFactory := DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, metrics, nil)
manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, metrics, reg, logger)
require.NoError(t, err)

return manager
Expand Down Expand Up @@ -221,9 +223,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer

func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) (*Ruler, *DefaultMultiTenantManager) {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger())
metrics := NewRuleEvalMetrics(rulerConfig, reg)
managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, metrics, reg)
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, metrics, reg, log.NewNopLogger())
require.NoError(t, err)

ruler, err := newRuler(
Expand Down