diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bb95a55820..f34a231c557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] Query: Added `query_storage_wall_time` to Query Frontend and Ruler query stats log for wall time spent on fetching data from storage. Query evaluation is not included. #5799 * [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808 * [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827 +* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805 * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 @@ -38,7 +39,6 @@ * [BUGFIX] Ring DDB: Fix lifecycle for ring counting unhealthy pods as healthy. #5838 * [BUGFIX] Ring DDB: Fix region assignment. #5842 - ## 1.16.0 2023-11-20 * [CHANGE] AlertManager: include reason label in `cortex_alertmanager_notifications_failed_total`. #5409 diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 50dc6aebe18..95bf7c43ccb 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -35,7 +35,7 @@ type DefaultMultiTenantManager struct { // Structs for holding per-user Prometheus rules Managers // and a corresponding metrics struct - userManagerMtx sync.Mutex + userManagerMtx sync.RWMutex userManagers map[string]RulesManager userManagerMetrics *ManagerMetrics @@ -50,6 +50,10 @@ type DefaultMultiTenantManager struct { configUpdatesTotal *prometheus.CounterVec registry prometheus.Registerer logger log.Logger + + ruleCache map[string][]*promRules.Group + ruleCacheMtx sync.RWMutex + syncRuleMtx sync.Mutex } func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { @@ -85,6 +89,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva mapper: newMapper(cfg.RulePath, logger), userManagers: map[string]RulesManager{}, userManagerMetrics: userManagerMetrics, + ruleCache: map[string][]*promRules.Group{}, managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "ruler_managers_total", @@ -111,15 +116,17 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva } func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { - // A lock is taken to ensure if this function is called concurrently, then each call - // returns after the call map files and check for updates - r.userManagerMtx.Lock() - defer r.userManagerMtx.Unlock() + // this is a safety lock to ensure this method is executed sequentially + r.syncRuleMtx.Lock() + defer r.syncRuleMtx.Unlock() for userID, ruleGroup := range ruleGroups { r.syncRulesToManager(ctx, userID, ruleGroup) } + r.userManagerMtx.Lock() + defer r.userManagerMtx.Unlock() + // Check for deleted users and remove them for userID, mngr := range r.userManagers { if _, exists := ruleGroups[userID]; !exists { @@ -142,6 +149,18 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.managersTotal.Set(float64(len(r.userManagers))) } +func (r *DefaultMultiTenantManager) updateRuleCache(user string, rules []*promRules.Group) { + r.ruleCacheMtx.Lock() + defer r.ruleCacheMtx.Unlock() + r.ruleCache[user] = rules +} + +func (r *DefaultMultiTenantManager) deleteRuleCache(user string) { + r.ruleCacheMtx.Lock() + defer r.ruleCacheMtx.Unlock() + delete(r.ruleCache, user) +} + // syncRulesToManager maps the rule files to disk, detects any changes and will create/update the // the users Prometheus Rules Manager. func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) { @@ -154,25 +173,25 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user return } - manager, exists := r.userManagers[user] - if !exists || update { + existing := true + manager := r.getRulesManager(user, ctx) + if manager == nil { + existing = false + manager = r.createRulesManager(user, ctx) + } + + if manager == nil { + return + } + + if !existing || update { level.Debug(r.logger).Log("msg", "updating rules", "user", user) r.configUpdatesTotal.WithLabelValues(user).Inc() - if !exists { - level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user) - manager, err = r.newManager(ctx, user) - if err != nil { - r.lastReloadSuccessful.WithLabelValues(user).Set(0) - level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err) - return - } - // manager.Run() starts running the manager and blocks until Stop() is called. - // Hence run it as another goroutine. - go manager.Run() - r.userManagers[user] = manager + if update && existing { + r.updateRuleCache(user, manager.RuleGroups()) } - err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc) + r.deleteRuleCache(user) if err != nil { r.lastReloadSuccessful.WithLabelValues(user).Set(0) level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err) @@ -184,6 +203,29 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user } } +func (r *DefaultMultiTenantManager) getRulesManager(user string, ctx context.Context) RulesManager { + r.userManagerMtx.RLock() + defer r.userManagerMtx.RUnlock() + return r.userManagers[user] +} + +func (r *DefaultMultiTenantManager) createRulesManager(user string, ctx context.Context) RulesManager { + r.userManagerMtx.Lock() + defer r.userManagerMtx.Unlock() + + manager, err := r.newManager(ctx, user) + if err != nil { + r.lastReloadSuccessful.WithLabelValues(user).Set(0) + level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err) + return nil + } + // manager.Run() starts running the manager and blocks until Stop() is called. + // Hence run it as another goroutine. + go manager.Run() + r.userManagers[user] = manager + return manager +} + func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) { logMessage := []interface{}{ "msg", "evaluating rule group", @@ -269,13 +311,25 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag return n.notifier, nil } +func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) { + r.ruleCacheMtx.RLock() + defer r.ruleCacheMtx.RUnlock() + groups, exists := r.ruleCache[userID] + return groups, exists +} + func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { var groups []*promRules.Group - r.userManagerMtx.Lock() - if mngr, exists := r.userManagers[userID]; exists { + groups, cached := r.getCachedRules(userID) + if cached { + return groups + } + r.userManagerMtx.RLock() + mngr, exists := r.userManagers[userID] + r.userManagerMtx.RUnlock() + if exists { groups = mngr.RuleGroups() } - r.userManagerMtx.Unlock() return groups } diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index c76888fc801..ec95205a14c 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -2,6 +2,7 @@ package ruler import ( "context" + "sync" "testing" "time" @@ -21,7 +22,14 @@ import ( func TestSyncRuleGroups(t *testing.T) { dir := t.TempDir() - m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger()) + waitDurations := []time.Duration{ + 1 * time.Millisecond, + 1 * time.Millisecond, + } + + ruleManagerFactory := RuleManagerFactory(nil, waitDurations) + + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger()) require.NoError(t, err) const user = "testUser" @@ -97,11 +105,118 @@ func TestSyncRuleGroups(t *testing.T) { }) } +func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) { + dir := t.TempDir() + const user = "testUser" + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + + groupsToReturn := [][]*promRules.Group{ + { + promRules.NewGroup(promRules.GroupOptions{ + Name: "group1", + File: "ns", + Interval: 60, + Limit: 0, + Opts: &promRules.ManagerOptions{}, + }), + }, + { + promRules.NewGroup(promRules.GroupOptions{ + Name: "group1", + File: "ns", + Interval: 60, + Limit: 0, + Opts: &promRules.ManagerOptions{}, + }), + promRules.NewGroup(promRules.GroupOptions{ + Name: "group2", + File: "ns", + Interval: 60, + Limit: 0, + Opts: &promRules.ManagerOptions{}, + }), + }, + } + + waitDurations := []time.Duration{ + 5 * time.Millisecond, + 1 * time.Second, + } + + ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations) + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + + m.SyncRuleGroups(context.Background(), userRules) + mgr := getManager(m, user) + require.NotNil(t, mgr) + + test.Poll(t, 1*time.Second, true, func() interface{} { + return mgr.(*mockRulesManager).running.Load() + }) + groups := m.GetRules(user) + require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups)) + + // update rules and call list rules concurrently + userRules = map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + &rulespb.RuleGroupDesc{ + Name: "group2", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + go m.SyncRuleGroups(context.Background(), userRules) + + groups = m.GetRules(user) + + require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups)) + test.Poll(t, 5*time.Second, len(groupsToReturn[1]), func() interface{} { + groups = m.GetRules(user) + return len(groups) + }) + + test.Poll(t, 1*time.Second, true, func() interface{} { + return mgr.(*mockRulesManager).running.Load() + }) + + m.Stop() + + test.Poll(t, 1*time.Second, false, func() interface{} { + return mgr.(*mockRulesManager).running.Load() + }) +} + func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) { dir := t.TempDir() reg := prometheus.NewPedanticRegistry() evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg) - m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger()) + + waitDurations := []time.Duration{ + 1 * time.Millisecond, + 1 * time.Millisecond, + } + + ruleManagerFactory := RuleManagerFactory(nil, waitDurations) + + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger()) require.NoError(t, err) const user = "testUser" @@ -139,19 +254,52 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) { } func getManager(m *DefaultMultiTenantManager, user string) RulesManager { - m.userManagerMtx.Lock() - defer m.userManagerMtx.Unlock() + m.userManagerMtx.RLock() + defer m.userManagerMtx.RUnlock() return m.userManagers[user] } -func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { - return &mockRulesManager{done: make(chan struct{})} +func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory { + return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { + return &mockRulesManager{ + done: make(chan struct{}), + groupsToReturn: groupsToReturn, + waitDurations: waitDurations, + iteration: -1, + } + } } type mockRulesManager struct { - running atomic.Bool - done chan struct{} + mtx sync.Mutex + groupsToReturn [][]*promRules.Group + iteration int + waitDurations []time.Duration + running atomic.Bool + done chan struct{} +} + +func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error { + m.mtx.Lock() + defer m.mtx.Unlock() + ticker := time.NewTicker(m.waitDurations[m.iteration+1]) + select { + case <-ticker.C: + m.iteration = m.iteration + 1 + return nil + case <-m.done: + return nil + } +} + +func (m *mockRulesManager) RuleGroups() []*promRules.Group { + m.mtx.Lock() + defer m.mtx.Unlock() + if m.iteration < 0 { + return nil + } + return m.groupsToReturn[m.iteration] } func (m *mockRulesManager) Run() { @@ -163,11 +311,3 @@ func (m *mockRulesManager) Stop() { m.running.Store(false) close(m.done) } - -func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error { - return nil -} - -func (m *mockRulesManager) RuleGroups() []*promRules.Group { - return nil -}