Skip to content

Commit 24be605

Browse files
committed
Ruler: Add support for per-user external labels
Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
1 parent ee62ab2 commit 24be605

File tree

12 files changed

+171
-20
lines changed

12 files changed

+171
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
18+
* [FEATURE] Ruler: Add support for per-user external labels #6340
1819
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1920
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
2021
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3604,6 +3604,9 @@ query_rejection:
36043604

36053605
# list of rule groups to disable
36063606
[disabled_rule_groups: <list of DisabledRuleGroup> | default = []]
3607+
3608+
# external labels for alerting rules
3609+
[external_labels: <map of string (labelName) to string (labelValue)> | default = []]
36073610
```
36083611
36093612
### `memberlist_config`

pkg/cortex/modules.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
154154
// no need to initialize module if load path is empty
155155
return nil, nil
156156
}
157-
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
157+
runtimeConfigLoader := runtimeConfigLoader{cfg: t.Cfg}
158+
t.Cfg.RuntimeConfig.Loader = runtimeConfigLoader.load
158159

159160
// make sure to set default limits before we start loading configuration into memory
160161
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
@@ -611,14 +612,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
611612
}
612613

613614
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
614-
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
615+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
615616
} else {
616617
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
617618
// TODO: Consider wrapping logger to differentiate from querier module logger
618619
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
619620

620621
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
621-
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
622+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
622623
}
623624

624625
if err != nil {

pkg/cortex/runtime_config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ func (l *runtimeConfigTenantLimits) AllByUserID() map[string]*validation.Limits
5858
return nil
5959
}
6060

61-
func loadRuntimeConfig(r io.Reader) (interface{}, error) {
61+
type runtimeConfigLoader struct {
62+
cfg Config
63+
}
64+
65+
func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
6266
var overrides = &RuntimeConfigValues{}
6367

6468
decoder := yaml.NewDecoder(r)
@@ -74,6 +78,12 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
7478
return nil, errMultipleDocuments
7579
}
7680

81+
for _, ul := range overrides.TenantLimits {
82+
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
83+
return nil, err
84+
}
85+
}
86+
7787
return overrides, nil
7888
}
7989

pkg/cortex/runtime_config_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
99

10+
"github.com/cortexproject/cortex/pkg/distributor"
1011
"github.com/cortexproject/cortex/pkg/util/validation"
1112
)
1213

@@ -28,7 +29,8 @@ overrides:
2829
'1235': *id001
2930
'1236': *id001
3031
`)
31-
runtimeCfg, err := loadRuntimeConfig(yamlFile)
32+
loader := runtimeConfigLoader{cfg: Config{Distributor: distributor.Config{ShardByAllLabels: true}}}
33+
runtimeCfg, err := loader.load(yamlFile)
3234
require.NoError(t, err)
3335

3436
limits := validation.Limits{
@@ -51,7 +53,7 @@ func TestLoadRuntimeConfig_ShouldLoadEmptyFile(t *testing.T) {
5153
yamlFile := strings.NewReader(`
5254
# This is an empty YAML.
5355
`)
54-
actual, err := loadRuntimeConfig(yamlFile)
56+
actual, err := runtimeConfigLoader{}.load(yamlFile)
5557
require.NoError(t, err)
5658
assert.Equal(t, &RuntimeConfigValues{}, actual)
5759
}
@@ -60,7 +62,7 @@ func TestLoadRuntimeConfig_MissingPointerFieldsAreNil(t *testing.T) {
6062
yamlFile := strings.NewReader(`
6163
# This is an empty YAML.
6264
`)
63-
actual, err := loadRuntimeConfig(yamlFile)
65+
actual, err := runtimeConfigLoader{}.load(yamlFile)
6466
require.NoError(t, err)
6567

6668
actualCfg, ok := actual.(*RuntimeConfigValues)
@@ -102,7 +104,7 @@ overrides:
102104
}
103105

104106
for _, tc := range cases {
105-
actual, err := loadRuntimeConfig(strings.NewReader(tc))
107+
actual, err := runtimeConfigLoader{}.load(strings.NewReader(tc))
106108
assert.Equal(t, errMultipleDocuments, err)
107109
assert.Nil(t, actual)
108110
}

pkg/ruler/compat.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type RulesLimits interface {
153153
RulerMaxRulesPerRuleGroup(userID string) int
154154
RulerQueryOffset(userID string) time.Duration
155155
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
156+
ExternalLabels(userID string) labels.Labels
156157
}
157158

158159
// EngineQueryFunc returns a new engine query function validating max queryLength.

pkg/ruler/external_labels.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ruler
2+
3+
import (
4+
"sync"
5+
6+
"github.com/prometheus/prometheus/model/labels"
7+
)
8+
9+
// userExternalLabels checks and merges per-user external labels with global external labels.
10+
type userExternalLabels struct {
11+
global labels.Labels
12+
limits RulesLimits
13+
builder *labels.Builder
14+
15+
mtx sync.Mutex
16+
users map[string]labels.Labels
17+
}
18+
19+
func newUserExternalLabels(global labels.Labels, limits RulesLimits) *userExternalLabels {
20+
return &userExternalLabels{
21+
global: global,
22+
limits: limits,
23+
builder: labels.NewBuilder(nil),
24+
25+
mtx: sync.Mutex{},
26+
users: map[string]labels.Labels{},
27+
}
28+
}
29+
30+
func (e *userExternalLabels) get(userID string) (labels.Labels, bool) {
31+
e.mtx.Lock()
32+
defer e.mtx.Unlock()
33+
lset, ok := e.users[userID]
34+
return lset, ok
35+
}
36+
37+
func (e *userExternalLabels) update(userID string) (labels.Labels, bool) {
38+
lset := e.limits.ExternalLabels(userID)
39+
40+
e.mtx.Lock()
41+
defer e.mtx.Unlock()
42+
43+
e.builder.Reset(e.global)
44+
for _, l := range lset {
45+
e.builder.Set(l.Name, l.Value)
46+
}
47+
lset = e.builder.Labels()
48+
49+
if !labels.Equal(e.users[userID], lset) {
50+
e.users[userID] = lset
51+
return lset, true
52+
}
53+
return lset, false
54+
}
55+
56+
func (e *userExternalLabels) remove(user string) {
57+
e.mtx.Lock()
58+
defer e.mtx.Unlock()
59+
delete(e.users, user)
60+
}
61+
62+
func (e *userExternalLabels) cleanup() {
63+
e.mtx.Lock()
64+
defer e.mtx.Unlock()
65+
for user := range e.users {
66+
delete(e.users, user)
67+
}
68+
}

pkg/ruler/manager.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/prometheus/client_golang/prometheus/promauto"
1818
"github.com/prometheus/prometheus/config"
1919
"github.com/prometheus/prometheus/discovery"
20+
"github.com/prometheus/prometheus/model/labels"
2021
"github.com/prometheus/prometheus/model/rulefmt"
2122
"github.com/prometheus/prometheus/notifier"
2223
promRules "github.com/prometheus/prometheus/rules"
@@ -47,6 +48,9 @@ type DefaultMultiTenantManager struct {
4748
notifiers map[string]*rulerNotifier
4849
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics
4950

51+
// Per-user externalLabels.
52+
userExternalLabels *userExternalLabels
53+
5054
// rules backup
5155
rulesBackupManager *rulesBackupManager
5256

@@ -62,7 +66,7 @@ type DefaultMultiTenantManager struct {
6266
syncRuleMtx sync.Mutex
6367
}
6468

65-
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
69+
func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
6670
ncfg, err := buildNotifierConfig(&cfg)
6771
if err != nil {
6872
return nil, err
@@ -92,6 +96,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
9296
frontendPool: newFrontendPool(cfg, logger, reg),
9397
ruleEvalMetrics: evalMetrics,
9498
notifiers: map[string]*rulerNotifier{},
99+
userExternalLabels: newUserExternalLabels(cfg.ExternalLabels, limits),
95100
notifiersDiscoveryMetrics: notifiersDiscoveryMetrics,
96101
mapper: newMapper(cfg.RulePath, logger),
97102
userManagers: map[string]RulesManager{},
@@ -146,6 +151,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
146151

147152
r.removeNotifier(userID)
148153
r.mapper.cleanupUser(userID)
154+
r.userExternalLabels.remove(userID)
149155
r.lastReloadSuccessful.DeleteLabelValues(userID)
150156
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
151157
r.configUpdatesTotal.DeleteLabelValues(userID)
@@ -183,12 +189,13 @@ func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGr
183189
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
184190
// Map the files to disk and return the file names to be passed to the users manager if they
185191
// have been updated
186-
update, files, err := r.mapper.MapRules(user, groups.Formatted())
192+
rulesUpdated, files, err := r.mapper.MapRules(user, groups.Formatted())
187193
if err != nil {
188194
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
189195
level.Error(r.logger).Log("msg", "unable to map rule files", "user", user, "err", err)
190196
return
191197
}
198+
externalLabels, externalLabelsUpdated := r.userExternalLabels.update(user)
192199

193200
existing := true
194201
manager := r.getRulesManager(user, ctx)
@@ -201,19 +208,26 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
201208
return
202209
}
203210

204-
if !existing || update {
211+
if !existing || rulesUpdated || externalLabelsUpdated {
205212
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
206213
r.configUpdatesTotal.WithLabelValues(user).Inc()
207-
if update && existing {
214+
if rulesUpdated && existing {
208215
r.updateRuleCache(user, manager.RuleGroups())
209216
}
210-
err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
217+
err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
211218
r.deleteRuleCache(user)
212219
if err != nil {
213220
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
214221
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
215222
return
216223
}
224+
if externalLabelsUpdated {
225+
if err = r.notifierApplyExternalLabels(user, externalLabels); err != nil {
226+
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
227+
level.Error(r.logger).Log("msg", "unable to update notifier", "user", user, "err", err)
228+
return
229+
}
230+
}
217231

218232
r.lastReloadSuccessful.WithLabelValues(user).Set(1)
219233
r.lastReloadSuccessfulTimestamp.WithLabelValues(user).SetToCurrentTime()
@@ -348,6 +362,19 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
348362
return n.notifier, nil
349363
}
350364

365+
func (r *DefaultMultiTenantManager) notifierApplyExternalLabels(userID string, externalLabels labels.Labels) error {
366+
r.notifiersMtx.Lock()
367+
defer r.notifiersMtx.Unlock()
368+
369+
n, ok := r.notifiers[userID]
370+
if !ok {
371+
return fmt.Errorf("notifier not found")
372+
}
373+
cfg := *r.notifierCfg // Copy it
374+
cfg.GlobalConfig.ExternalLabels = externalLabels
375+
return n.applyConfig(&cfg)
376+
}
377+
351378
func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
352379
r.ruleCacheMtx.RLock()
353380
defer r.ruleCacheMtx.RUnlock()
@@ -402,6 +429,7 @@ func (r *DefaultMultiTenantManager) Stop() {
402429

403430
// cleanup user rules directories
404431
r.mapper.cleanup()
432+
r.userExternalLabels.cleanup()
405433
}
406434

407435
func (*DefaultMultiTenantManager) ValidateRuleGroup(g rulefmt.RuleGroup) []error {

pkg/ruler/manager_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ func TestSyncRuleGroups(t *testing.T) {
2929
}
3030

3131
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
32+
limits := ruleLimits{externalLabels: labels.FromStrings("from", "cortex")}
3233

33-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
34+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, limits, ruleManagerFactory, nil, nil, log.NewNopLogger())
3435
require.NoError(t, err)
3536

3637
const user = "testUser"
@@ -61,6 +62,9 @@ func TestSyncRuleGroups(t *testing.T) {
6162
require.NoError(t, err)
6263
require.Equal(t, []string{user}, users)
6364
require.True(t, ok)
65+
lset, ok := m.userExternalLabels.get(user)
66+
require.True(t, ok)
67+
require.Equal(t, limits.externalLabels, lset)
6468
}
6569

6670
// Passing empty map / nil stops all managers.
@@ -79,6 +83,8 @@ func TestSyncRuleGroups(t *testing.T) {
7983
require.NoError(t, err)
8084
require.Equal(t, []string(nil), users)
8185
require.False(t, ok)
86+
_, ok = m.userExternalLabels.get(user)
87+
require.False(t, ok)
8288
}
8389

8490
// Resync same rules as before. Previously this didn't restart the manager.
@@ -154,7 +160,7 @@ func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
154160
}
155161

156162
ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
157-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
163+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
158164
require.NoError(t, err)
159165

160166
m.SyncRuleGroups(context.Background(), userRules)
@@ -217,7 +223,7 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
217223

218224
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
219225

220-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
226+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
221227
require.NoError(t, err)
222228

223229
const user = "testUser"
@@ -265,7 +271,7 @@ func TestBackupRules(t *testing.T) {
265271
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
266272
config := Config{RulePath: dir}
267273
config.Ring.ReplicationFactor = 3
268-
m, err := NewDefaultMultiTenantManager(config, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
274+
m, err := NewDefaultMultiTenantManager(config, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
269275
require.NoError(t, err)
270276

271277
const user1 = "testUser"

0 commit comments

Comments
 (0)