Skip to content

Commit 6ca8c88

Browse files
juliusvtomwilkie
authored andcommitted
Limit series per metric (cortexproject#276)
* Limit series per metric Fixes cortexproject#47 * Fix error wrapping for series-per-metric exceeded * Move num-metrics-per-series handling to userState * Review feeback
1 parent 41c934d commit 6ca8c88

File tree

5 files changed

+164
-20
lines changed

5 files changed

+164
-20
lines changed

distributor/http_server.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
2424

2525
_, err := d.Push(ctx, &req)
2626
if err != nil {
27-
if grpc.Code(err) == codes.ResourceExhausted && grpc.ErrorDesc(err) == util.ErrUserSeriesLimitExceeded.Error() {
28-
err = util.ErrUserSeriesLimitExceeded
27+
if grpc.Code(err) == codes.ResourceExhausted {
28+
switch grpc.ErrorDesc(err) {
29+
case util.ErrUserSeriesLimitExceeded.Error():
30+
err = util.ErrUserSeriesLimitExceeded
31+
case util.ErrMetricSeriesLimitExceeded.Error():
32+
err = util.ErrMetricSeriesLimitExceeded
33+
}
2934
}
3035

3136
var code int
3237
switch err {
3338
case errIngestionRateLimitExceeded:
3439
code = http.StatusTooManyRequests
35-
case util.ErrUserSeriesLimitExceeded:
40+
case util.ErrUserSeriesLimitExceeded, util.ErrMetricSeriesLimitExceeded:
3641
code = http.StatusInsufficientStorage
3742
default:
3843
code = http.StatusInternalServerError

ingester/ingester.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ const (
3838
DefaultConcurrentFlush = 50
3939
// DefaultMaxSeriesPerUser is the maximum number of series allowed per user.
4040
DefaultMaxSeriesPerUser = 5000000
41+
// DefaultMaxSeriesPerMetric is the maximum number of series in one metric (of a single user).
42+
DefaultMaxSeriesPerMetric = 50000
4143

4244
minReadyDuration = 1 * time.Minute
4345
)
@@ -118,6 +120,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
118120
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", "1", "Encoding version to use for chunks.")
119121
f.DurationVar(&cfg.UserStatesConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
120122
f.IntVar(&cfg.UserStatesConfig.MaxSeriesPerUser, "ingester.max-series-per-user", DefaultMaxSeriesPerUser, "Maximum number of active series per user.")
123+
f.IntVar(&cfg.UserStatesConfig.MaxSeriesPerMetric, "ingester.max-series-per-metric", DefaultMaxSeriesPerMetric, "Maximum number of active series per metric name.")
121124
}
122125

123126
type flushOp struct {
@@ -149,11 +152,14 @@ func New(cfg Config, chunkStore cortex_chunk.Store, ring *ring.Ring) (*Ingester,
149152
if cfg.ChunkEncoding == "" {
150153
cfg.ChunkEncoding = "1"
151154
}
155+
if cfg.UserStatesConfig.RateUpdatePeriod == 0 {
156+
cfg.UserStatesConfig.RateUpdatePeriod = 15 * time.Second
157+
}
152158
if cfg.UserStatesConfig.MaxSeriesPerUser <= 0 {
153159
cfg.UserStatesConfig.MaxSeriesPerUser = DefaultMaxSeriesPerUser
154160
}
155-
if cfg.UserStatesConfig.RateUpdatePeriod == 0 {
156-
cfg.UserStatesConfig.RateUpdatePeriod = 15 * time.Second
161+
if cfg.UserStatesConfig.MaxSeriesPerMetric <= 0 {
162+
cfg.UserStatesConfig.MaxSeriesPerMetric = DefaultMaxSeriesPerMetric
157163
}
158164

159165
if err := chunk.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil {
@@ -249,7 +255,7 @@ func (i *Ingester) Push(ctx context.Context, req *remote.WriteRequest) (*cortex.
249255
var lastPartialErr error
250256
for _, sample := range util.FromWriteRequest(req) {
251257
if err := i.append(ctx, sample); err != nil {
252-
if err == util.ErrUserSeriesLimitExceeded {
258+
if err == util.ErrUserSeriesLimitExceeded || err == util.ErrMetricSeriesLimitExceeded {
253259
lastPartialErr = grpc.Errorf(codes.ResourceExhausted, err.Error())
254260
continue
255261
}
@@ -594,8 +600,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
594600
series.chunkDescs = series.chunkDescs[len(chunks):]
595601
i.memoryChunks.Sub(float64(len(chunks)))
596602
if len(series.chunkDescs) == 0 {
597-
userState.fpToSeries.del(fp)
598-
userState.index.delete(series.metric, fp)
603+
userState.removeSeries(fp, series.metric)
599604
}
600605
userState.fpLocker.Unlock(fp)
601606
return nil

ingester/ingester_test.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func TestIngesterAppend(t *testing.T) {
149149
}
150150
}
151151

152-
func TestIngesterSeriesLimitExceeded(t *testing.T) {
152+
func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
153153
cfg := Config{
154154
FlushCheckPeriod: 99999 * time.Hour,
155155
MaxChunkIdle: 99999 * time.Hour,
@@ -192,7 +192,93 @@ func TestIngesterSeriesLimitExceeded(t *testing.T) {
192192
// Append to two series, expect series-exceeded error.
193193
_, err = ing.Push(ctx, util.ToWriteRequest([]*model.Sample{sample2, sample3}))
194194
if grpc.ErrorDesc(err) != util.ErrUserSeriesLimitExceeded.Error() {
195-
t.Fatalf("expected series-exceeded error, got %v", err)
195+
t.Fatalf("expected error about exceeding metrics per user, got %v", err)
196+
}
197+
198+
// Read samples back via ingester queries.
199+
matcher, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "testmetric")
200+
if err != nil {
201+
t.Fatal(err)
202+
}
203+
204+
req, err := util.ToQueryRequest(model.Earliest, model.Latest, []*metric.LabelMatcher{matcher})
205+
if err != nil {
206+
t.Fatal(err)
207+
}
208+
209+
resp, err := ing.Query(ctx, req)
210+
if err != nil {
211+
t.Fatal(err)
212+
}
213+
214+
res := util.FromQueryResponse(resp)
215+
sort.Sort(res)
216+
217+
expected := model.Matrix{
218+
{
219+
Metric: sample1.Metric,
220+
Values: []model.SamplePair{
221+
{
222+
Timestamp: sample1.Timestamp,
223+
Value: sample1.Value,
224+
},
225+
{
226+
Timestamp: sample2.Timestamp,
227+
Value: sample2.Value,
228+
},
229+
},
230+
},
231+
}
232+
233+
if !reflect.DeepEqual(res, expected) {
234+
t.Fatalf("unexpected query result\n\nwant:\n\n%v\n\ngot:\n\n%v\n\n", expected, res)
235+
}
236+
}
237+
238+
func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
239+
cfg := Config{
240+
FlushCheckPeriod: 99999 * time.Hour,
241+
MaxChunkIdle: 99999 * time.Hour,
242+
UserStatesConfig: UserStatesConfig{
243+
MaxSeriesPerMetric: 1,
244+
},
245+
}
246+
store := &testStore{
247+
chunks: map[string][]chunk.Chunk{},
248+
}
249+
ing, err := New(cfg, store, nil)
250+
if err != nil {
251+
t.Fatal(err)
252+
}
253+
254+
userID := "1"
255+
sample1 := &model.Sample{
256+
Metric: model.Metric{model.MetricNameLabel: "testmetric", "foo": "bar"},
257+
Timestamp: 0,
258+
Value: 1,
259+
}
260+
sample2 := &model.Sample{
261+
Metric: model.Metric{model.MetricNameLabel: "testmetric", "foo": "bar"},
262+
Timestamp: 1,
263+
Value: 2,
264+
}
265+
sample3 := &model.Sample{
266+
Metric: model.Metric{model.MetricNameLabel: "testmetric", "foo": "biz"},
267+
Timestamp: 1,
268+
Value: 3,
269+
}
270+
271+
// Append only one series first, expect no error.
272+
ctx := user.WithID(context.Background(), userID)
273+
_, err = ing.Push(ctx, util.ToWriteRequest([]*model.Sample{sample1}))
274+
if err != nil {
275+
t.Fatal(err)
276+
}
277+
278+
// Append to two series, expect series-exceeded error.
279+
_, err = ing.Push(ctx, util.ToWriteRequest([]*model.Sample{sample2, sample3}))
280+
if grpc.ErrorDesc(err) != util.ErrMetricSeriesLimitExceeded.Error() {
281+
t.Fatalf("expected error about exceeding series per metric, got %v", err)
196282
}
197283

198284
// Read samples back via ingester queries.

ingester/user_state.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ type userState struct {
2626
mapper *fpMapper
2727
index *invertedIndex
2828
ingestedSamples *ewmaRate
29+
30+
seriesInMetricMtx sync.Mutex
31+
seriesInMetric map[model.LabelValue]int
2932
}
3033

3134
// UserStatesConfig configures userStates properties.
3235
type UserStatesConfig struct {
33-
RateUpdatePeriod time.Duration
34-
MaxSeriesPerUser int
36+
RateUpdatePeriod time.Duration
37+
MaxSeriesPerUser int
38+
MaxSeriesPerMetric int
3539
}
3640

3741
func newUserStates(cfg *UserStatesConfig) *userStates {
@@ -130,7 +134,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, metric model.Metric
130134
us.mtx.RLock()
131135
state, ok = us.states[userID]
132136
if ok {
133-
fp, series, err = state.unlockedGet(metric, us.cfg.MaxSeriesPerUser)
137+
fp, series, err = state.unlockedGet(metric, us.cfg)
134138
if err != nil {
135139
us.mtx.RUnlock()
136140
return nil, fp, nil, err
@@ -144,7 +148,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, metric model.Metric
144148
us.mtx.Lock()
145149
defer us.mtx.Unlock()
146150
state = us.unlockedGetOrCreate(userID)
147-
fp, series, err = state.unlockedGet(metric, us.cfg.MaxSeriesPerUser)
151+
fp, series, err = state.unlockedGet(metric, us.cfg)
148152
return state, fp, series, err
149153
}
150154

@@ -157,14 +161,15 @@ func (us *userStates) unlockedGetOrCreate(userID string) *userState {
157161
fpLocker: newFingerprintLocker(16),
158162
index: newInvertedIndex(),
159163
ingestedSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
164+
seriesInMetric: map[model.LabelValue]int{},
160165
}
161166
state.mapper = newFPMapper(state.fpToSeries)
162167
us.states[userID] = state
163168
}
164169
return state
165170
}
166171

167-
func (u *userState) unlockedGet(metric model.Metric, maxSeries int) (model.Fingerprint, *memorySeries, error) {
172+
func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (model.Fingerprint, *memorySeries, error) {
168173
rawFP := metric.FastFingerprint()
169174
u.fpLocker.Lock(rawFP)
170175
fp := u.mapper.mapFP(rawFP, metric)
@@ -183,17 +188,59 @@ func (u *userState) unlockedGet(metric model.Metric, maxSeries int) (model.Finge
183188
// all proceed to add a new series. This is likely not worth addressing,
184189
// as this should happen rarely (all samples from one push are added
185190
// serially), and the overshoot in allowed series would be minimal.
186-
if u.fpToSeries.length() >= maxSeries {
191+
if u.fpToSeries.length() >= cfg.MaxSeriesPerUser {
187192
u.fpLocker.Unlock(fp)
188193
return fp, nil, util.ErrUserSeriesLimitExceeded
189194
}
190195

196+
metricName, err := util.ExtractMetricNameFromMetric(metric)
197+
if err != nil {
198+
u.fpLocker.Unlock(fp)
199+
return fp, nil, err
200+
}
201+
202+
if !u.canAddSeriesFor(metricName, cfg) {
203+
u.fpLocker.Unlock(fp)
204+
return fp, nil, util.ErrMetricSeriesLimitExceeded
205+
}
206+
191207
series = newMemorySeries(metric)
192208
u.fpToSeries.put(fp, series)
193209
u.index.add(metric, fp)
194210
return fp, series, nil
195211
}
196212

213+
func (u *userState) canAddSeriesFor(metric model.LabelValue, cfg *UserStatesConfig) bool {
214+
u.seriesInMetricMtx.Lock()
215+
defer u.seriesInMetricMtx.Unlock()
216+
217+
if u.seriesInMetric[metric] >= cfg.MaxSeriesPerMetric {
218+
return false
219+
}
220+
u.seriesInMetric[metric]++
221+
return true
222+
}
223+
224+
func (u *userState) removeSeries(fp model.Fingerprint, metric model.Metric) {
225+
u.fpToSeries.del(fp)
226+
u.index.delete(metric, fp)
227+
228+
metricName, err := util.ExtractMetricNameFromMetric(metric)
229+
if err != nil {
230+
// Series without a metric name should never be able to make it into
231+
// the ingester's memory storage.
232+
panic(err)
233+
}
234+
235+
u.seriesInMetricMtx.Lock()
236+
defer u.seriesInMetricMtx.Unlock()
237+
238+
u.seriesInMetric[metricName]--
239+
if u.seriesInMetric[metricName] == 0 {
240+
delete(u.seriesInMetric, metricName)
241+
}
242+
}
243+
197244
// forSeriesMatching passes all series matching the given matchers to the provided callback.
198245
// Deals with locking and the quirks of zero-length matcher values.
199246
func (u *userState) forSeriesMatching(allMatchers []*metric.LabelMatcher, callback func(model.Fingerprint, *memorySeries) error) error {

util/error.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ func (e Error) Error() string { return string(e) }
77

88
// Errors returned by Cortex components.
99
const (
10-
ErrMissingMetricName = Error("sample missing metric name")
11-
ErrInvalidMetricName = Error("sample invalid metric name")
12-
ErrInvalidLabel = Error("sample invalid label")
13-
ErrUserSeriesLimitExceeded = Error("per-user series limit exceeded")
10+
ErrMissingMetricName = Error("sample missing metric name")
11+
ErrInvalidMetricName = Error("sample invalid metric name")
12+
ErrInvalidLabel = Error("sample invalid label")
13+
ErrUserSeriesLimitExceeded = Error("per-user series limit exceeded")
14+
ErrMetricSeriesLimitExceeded = Error("per-metric series limit exceeded")
1415
)

0 commit comments

Comments
 (0)