Skip to content

Changing the per labelset config to be more generic #5993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
* [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931
* [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933
* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950
* [ENHANCEMENT] Ingester: Add a new `limits_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 #5993
* [ENHANCEMENT] Store Gateway: Log gRPC requests together with headers configured in `http_request_headers_to_log`. #5958
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
Expand Down
15 changes: 9 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3172,9 +3172,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-series-per-metric
[max_global_series_per_metric: <int> | default = 0]

# [Experimental] The maximum number of active series per LabelSet, across the
# cluster before replication. Empty list to disable.
[max_series_per_label_set: <list of MaxSeriesPerLabelSet> | default = []]
# [Experimental] Enable limits per LabelSet. Supported limits per labelSet:
# [max_series]
[limits_per_label_set: <list of LimitsPerLabelSet> | default = []]

# The maximum number of active metrics with metadata per user, per ingester. 0
# to disable.
Expand Down Expand Up @@ -5314,11 +5314,14 @@ otel:
[tls_insecure_skip_verify: <boolean> | default = false]
```

### `MaxSeriesPerLabelSet`
### `LimitsPerLabelSet`

```yaml
# The maximum number of active series per LabelSet before replication.
[limit: <int> | default = ]
limits:
# The maximum number of active series per LabelSet, across the cluster before
# replication. Setting the value 0 will enable the monitoring (metrics) but
# would not enforce any limits.
[max_series: <int> | default = ]

# LabelSet which the limit should be applied.
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics.activeSeriesPerLabelSet); err != nil {
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil {
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
}
}
Expand Down
174 changes: 112 additions & 62 deletions pkg/ingester/ingester_test.go

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error {
m := l.maxSeriesPerLabelSet(userID, metric)
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
maxFunc := func(string) int {
return limit.Limit
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxFunc, maxFunc)
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
id: limit.Id,
localLimit: local,
globalLimit: limit.Limit,
globalLimit: limit.Limits.MaxSeries,
}
}
}
Expand Down Expand Up @@ -189,15 +189,15 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim
minNonZero(err.globalLimit, err.localLimit), err.id, err.localLimit, err.globalLimit)
}

func (l *Limiter) maxSeriesPerLabelSet(userID string, metric labels.Labels) []validation.MaxSeriesPerLabelSet {
m := l.limits.MaxSeriesPerLabelSet(userID)
func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.MaxSeriesPerLabelSet, 0, len(m))
r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
Expand Down
86 changes: 86 additions & 0 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package ingester

import (
"errors"

"math"
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -423,6 +425,90 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
}
}

func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {

tests := map[string]struct {
limits validation.Limits
expected error
ringReplicationFactor int
ringIngesterCount int
shardByAllLabels bool
series int
}{
"both local and global limit are disabled": {
ringReplicationFactor: 3,
ringIngesterCount: 10,
series: 200,
shardByAllLabels: true,
limits: validation.Limits{
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
{
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 0,
},
},
},
},
},
"current number of series is above the limit": {
ringReplicationFactor: 3,
ringIngesterCount: 10,
series: 200,
shardByAllLabels: true,
expected: errMaxSeriesPerLabelSetLimitExceeded{globalLimit: 10, localLimit: 3},
limits: validation.Limits{
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
{
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10,
},
},
},
},
},
"current number of series is below the limit and shard by all labels": {
ringReplicationFactor: 3,
ringIngesterCount: 10,
series: 2,
shardByAllLabels: true,
limits: validation.Limits{
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
{
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10,
},
},
},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(testData.ringIngesterCount)
ring.On("ZonesCount").Return(1)

// Mock limits
limits, err := validation.NewOverrides(testData.limits, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

assert.Equal(t, actual, testData.expected)
})
}
}

func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalMetadataPerUser int
Expand Down
18 changes: 12 additions & 6 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type ingesterMetrics struct {
memSeriesRemovedTotal *prometheus.CounterVec
memMetadataRemovedTotal *prometheus.CounterVec

activeSeriesPerUser *prometheus.GaugeVec
activeSeriesPerLabelSet *prometheus.GaugeVec
activeSeriesPerUser *prometheus.GaugeVec
limitsPerLabelSet *prometheus.GaugeVec
usagePerLabelSet *prometheus.GaugeVec

// Global limit metrics
maxUsersGauge prometheus.GaugeFunc
Expand Down Expand Up @@ -212,10 +213,15 @@ func newIngesterMetrics(r prometheus.Registerer,
return 0
}),

activeSeriesPerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_active_series_per_labelset",
Help: "Number of currently active series per user and labelset.",
}, []string{"user", "labelset"}),
limitsPerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_limits_per_labelset",
Help: "Limits per user and labelset.",
}, []string{"user", "limit", "labelset"}),

usagePerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_usage_per_labelset",
Help: "Current usage per user and labelset.",
}, []string{"user", "limit", "labelset"}),

// Not registered automatically, but only if activeSeriesEnabled is true.
activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down
26 changes: 15 additions & 11 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/index"
Expand Down Expand Up @@ -115,7 +114,7 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
}

func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.MaxSeriesPerLabelSet) (int, error) {
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
s.RLock()
if r, ok := s.valuesCounter[set.Hash]; ok {
Expand All @@ -129,7 +128,7 @@ func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTS
})
}

func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.MaxSeriesPerLabelSet, s *labelSetCounterShard) (int, error) {
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
ir, err := u.db.Head().Index()
if err != nil {
return 0, err
Expand Down Expand Up @@ -171,7 +170,7 @@ func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit
}

func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric)
limits := m.limiter.limitsPerLabelSets(u.userID, metric)
for _, l := range limits {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
s.Lock()
Expand All @@ -188,7 +187,7 @@ func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labe
}

func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric)
limits := m.limiter.limitsPerLabelSets(u.userID, metric)
for _, l := range limits {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
s.Lock()
Expand All @@ -199,23 +198,26 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe
}
}

func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *prometheus.GaugeVec) error {
currentLbsLimitHash := map[uint64]validation.MaxSeriesPerLabelSet{}
for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) {
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error {
currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{}
for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) {
currentLbsLimitHash[l.Hash] = l
}

for i := 0; i < numMetricCounterShards; i++ {
s := m.shards[i]
s.RLock()
for h, entry := range s.valuesCounter {
lbls := entry.labels.String()
// This limit no longer exists
if _, ok := currentLbsLimitHash[h]; !ok {
vec.DeleteLabelValues(u.userID, entry.labels.String())
metrics.usagePerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
continue
}
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
delete(currentLbsLimitHash, h)
vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count))
}
s.RUnlock()
}
Expand All @@ -227,7 +229,9 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *pr
if err != nil {
return err
}
vec.WithLabelValues(u.userID, l.LabelSet.String()).Set(float64(count))
lbls := l.LabelSet.String()
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(l.Limits.MaxSeries))
}

return nil
Expand Down
Loading
Loading