Skip to content

Commit

Permalink
Restructure mutex such that manager is not holding one mutex for the …
Browse files Browse the repository at this point in the history
…entirety of sync rules (#5805)

Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
  • Loading branch information
rajagopalanand authored and alanprot committed Apr 2, 2024
1 parent bcebcc4 commit 743a3ba
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 40 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [ENHANCEMENT] Query: Added `query_storage_wall_time` to Query Frontend and Ruler query stats log for wall time spent on fetching data from storage. Query evaluation is not included. #5799
* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808
* [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand All @@ -38,7 +39,6 @@
* [BUGFIX] Ring DDB: Fix lifecycle for ring counting unhealthy pods as healthy. #5838
* [BUGFIX] Ring DDB: Fix region assignment. #5842


## 1.16.0 2023-11-20

* [CHANGE] AlertManager: include reason label in `cortex_alertmanager_notifications_failed_total`. #5409
Expand Down
100 changes: 77 additions & 23 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type DefaultMultiTenantManager struct {

// Structs for holding per-user Prometheus rules Managers
// and a corresponding metrics struct
userManagerMtx sync.Mutex
userManagerMtx sync.RWMutex
userManagers map[string]RulesManager
userManagerMetrics *ManagerMetrics

Expand All @@ -50,6 +50,10 @@ type DefaultMultiTenantManager struct {
configUpdatesTotal *prometheus.CounterVec
registry prometheus.Registerer
logger log.Logger

ruleCache map[string][]*promRules.Group
ruleCacheMtx sync.RWMutex
syncRuleMtx sync.Mutex
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
Expand Down Expand Up @@ -85,6 +89,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
userManagerMetrics: userManagerMetrics,
ruleCache: map[string][]*promRules.Group{},
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ruler_managers_total",
Expand All @@ -111,15 +116,17 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
// A lock is taken to ensure if this function is called concurrently, then each call
// returns after the call map files and check for updates
r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()
// this is a safety lock to ensure this method is executed sequentially
r.syncRuleMtx.Lock()
defer r.syncRuleMtx.Unlock()

for userID, ruleGroup := range ruleGroups {
r.syncRulesToManager(ctx, userID, ruleGroup)
}

r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()

// Check for deleted users and remove them
for userID, mngr := range r.userManagers {
if _, exists := ruleGroups[userID]; !exists {
Expand All @@ -142,6 +149,18 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.managersTotal.Set(float64(len(r.userManagers)))
}

func (r *DefaultMultiTenantManager) updateRuleCache(user string, rules []*promRules.Group) {
r.ruleCacheMtx.Lock()
defer r.ruleCacheMtx.Unlock()
r.ruleCache[user] = rules
}

func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
r.ruleCacheMtx.Lock()
defer r.ruleCacheMtx.Unlock()
delete(r.ruleCache, user)
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
Expand All @@ -154,25 +173,25 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
return
}

manager, exists := r.userManagers[user]
if !exists || update {
existing := true
manager := r.getRulesManager(user, ctx)
if manager == nil {
existing = false
manager = r.createRulesManager(user, ctx)
}

if manager == nil {
return
}

if !existing || update {
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
r.configUpdatesTotal.WithLabelValues(user).Inc()
if !exists {
level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user)
manager, err = r.newManager(ctx, user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
return
}
// manager.Run() starts running the manager and blocks until Stop() is called.
// Hence run it as another goroutine.
go manager.Run()
r.userManagers[user] = manager
if update && existing {
r.updateRuleCache(user, manager.RuleGroups())
}

err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
r.deleteRuleCache(user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
Expand All @@ -184,6 +203,29 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
}
}

func (r *DefaultMultiTenantManager) getRulesManager(user string, ctx context.Context) RulesManager {
r.userManagerMtx.RLock()
defer r.userManagerMtx.RUnlock()
return r.userManagers[user]
}

func (r *DefaultMultiTenantManager) createRulesManager(user string, ctx context.Context) RulesManager {
r.userManagerMtx.Lock()
defer r.userManagerMtx.Unlock()

manager, err := r.newManager(ctx, user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
return nil
}
// manager.Run() starts running the manager and blocks until Stop() is called.
// Hence run it as another goroutine.
go manager.Run()
r.userManagers[user] = manager
return manager
}

func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
logMessage := []interface{}{
"msg", "evaluating rule group",
Expand Down Expand Up @@ -269,13 +311,25 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
return n.notifier, nil
}

func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
r.ruleCacheMtx.RLock()
defer r.ruleCacheMtx.RUnlock()
groups, exists := r.ruleCache[userID]
return groups, exists
}

func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
var groups []*promRules.Group
r.userManagerMtx.Lock()
if mngr, exists := r.userManagers[userID]; exists {
groups, cached := r.getCachedRules(userID)
if cached {
return groups
}
r.userManagerMtx.RLock()
mngr, exists := r.userManagers[userID]
r.userManagerMtx.RUnlock()
if exists {
groups = mngr.RuleGroups()
}
r.userManagerMtx.Unlock()
return groups
}

Expand Down
172 changes: 156 additions & 16 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ruler

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -21,7 +22,14 @@ import (
func TestSyncRuleGroups(t *testing.T) {
dir := t.TempDir()

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -97,11 +105,118 @@ func TestSyncRuleGroups(t *testing.T) {
})
}

func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
dir := t.TempDir()
const user = "testUser"
userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}

groupsToReturn := [][]*promRules.Group{
{
promRules.NewGroup(promRules.GroupOptions{
Name: "group1",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
},
{
promRules.NewGroup(promRules.GroupOptions{
Name: "group1",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
promRules.NewGroup(promRules.GroupOptions{
Name: "group2",
File: "ns",
Interval: 60,
Limit: 0,
Opts: &promRules.ManagerOptions{},
}),
},
}

waitDurations := []time.Duration{
5 * time.Millisecond,
1 * time.Second,
}

ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)

m.SyncRuleGroups(context.Background(), userRules)
mgr := getManager(m, user)
require.NotNil(t, mgr)

test.Poll(t, 1*time.Second, true, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})
groups := m.GetRules(user)
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))

// update rules and call list rules concurrently
userRules = map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
&rulespb.RuleGroupDesc{
Name: "group2",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}
go m.SyncRuleGroups(context.Background(), userRules)

groups = m.GetRules(user)

require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
test.Poll(t, 5*time.Second, len(groupsToReturn[1]), func() interface{} {
groups = m.GetRules(user)
return len(groups)
})

test.Poll(t, 1*time.Second, true, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})

m.Stop()

test.Poll(t, 1*time.Second, false, func() interface{} {
return mgr.(*mockRulesManager).running.Load()
})
}

func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())

waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -139,19 +254,52 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.Lock()
defer m.userManagerMtx.Unlock()
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()

return m.userManagers[user]
}

func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &mockRulesManager{done: make(chan struct{})}
func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory {
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &mockRulesManager{
done: make(chan struct{}),
groupsToReturn: groupsToReturn,
waitDurations: waitDurations,
iteration: -1,
}
}
}

type mockRulesManager struct {
running atomic.Bool
done chan struct{}
mtx sync.Mutex
groupsToReturn [][]*promRules.Group
iteration int
waitDurations []time.Duration
running atomic.Bool
done chan struct{}
}

func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
m.mtx.Lock()
defer m.mtx.Unlock()
ticker := time.NewTicker(m.waitDurations[m.iteration+1])
select {
case <-ticker.C:
m.iteration = m.iteration + 1
return nil
case <-m.done:
return nil
}
}

func (m *mockRulesManager) RuleGroups() []*promRules.Group {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.iteration < 0 {
return nil
}
return m.groupsToReturn[m.iteration]
}

func (m *mockRulesManager) Run() {
Expand All @@ -163,11 +311,3 @@ func (m *mockRulesManager) Stop() {
m.running.Store(false)
close(m.done)
}

func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
return nil
}

func (m *mockRulesManager) RuleGroups() []*promRules.Group {
return nil
}

0 comments on commit 743a3ba

Please sign in to comment.