Skip to content

Commit de03d68

Browse files
committed
Implementy max_fetched_data_bytes_per_query limit
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
1 parent 48bc900 commit de03d68

File tree

16 files changed

+316
-52
lines changed

16 files changed

+316
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* [FEATURE] Added `-api.http-request-headers-to-log` allowing for the addition of HTTP Headers to logs #4803
5656
* [FEATURE] Distributor: Added a new limit `-validation.max-labels-size-bytes` allowing to limit the combined size of labels for each timeseries. #4848
5757
* [FEATURE] Storage/Bucket: Added `-*.s3.bucket-lookup-type` allowing to configure the s3 bucket lookup type. #4794
58+
* [FEATURE] Querier: Added a new limit `-querier.max-fetched-data-bytes-per-query` allowing to limit the maximum size of all data in bytes that a query can fetch from each ingester and storage. #4854
5859
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
5960
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767
6061

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2673,12 +2673,19 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
26732673
# CLI flag: -querier.max-fetched-series-per-query
26742674
[max_fetched_series_per_query: <int> | default = 0]
26752675
2676-
# The maximum size of all chunks in bytes that a query can fetch from each
2677-
# ingester and storage. This limit is enforced in the querier and ruler only
2678-
# when running Cortex with blocks storage. 0 to disable.
2676+
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2677+
# of all chunks in bytes that a query can fetch from each ingester and storage.
2678+
# This limit is enforced in the querier and ruler only when running Cortex with
2679+
# blocks storage. 0 to disable.
26792680
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
26802681
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
26812682
2683+
# The maximum combined size of all data that a query can fetch from each
2684+
# ingester and storage. This limit is enforced in the querier and ruler only
2685+
# when running Cortex with blocks storage. 0 to disable.
2686+
# CLI flag: -querier.max-fetched-data-bytes-per-query
2687+
[max_fetched_data_bytes_per_query: <int> | default = 0]
2688+
26822689
# Limit how long back data (series and metadata) can be queried, up until
26832690
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
26842691
# and ruler. If the requested time range is outside the allowed range, the

pkg/distributor/distributor_test.go

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
10321032
limits: limits,
10331033
})
10341034

1035-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
1035+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0))
10361036

10371037
// Push a number of series below the max chunks limit. Each series has 1 sample,
10381038
// so expect 1 chunk per series when querying back.
@@ -1077,7 +1077,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
10771077
ctx := user.InjectOrgID(context.Background(), "user")
10781078
limits := &validation.Limits{}
10791079
flagext.DefaultValues(limits)
1080-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))
1080+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0))
10811081

10821082
// Prepare distributors.
10831083
ds, _, _, _ := prepare(t, prepConfig{
@@ -1161,7 +1161,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11611161
var maxBytesLimit = (seriesToAdd) * responseChunkSize
11621162

11631163
// Update the limiter with the calculated limits.
1164-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0))
1164+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0))
11651165

11661166
// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
11671167
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
@@ -1192,6 +1192,75 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11921192
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit)))
11931193
}
11941194

1195+
func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached(t *testing.T) {
1196+
const seriesToAdd = 10
1197+
1198+
ctx := user.InjectOrgID(context.Background(), "user")
1199+
limits := &validation.Limits{}
1200+
flagext.DefaultValues(limits)
1201+
1202+
// Prepare distributors.
1203+
// Use replication factor of 2 to always read all the chunks from both ingesters,
1204+
// this guarantees us to always read the same chunks and have a stable test.
1205+
ds, _, _, _ := prepare(t, prepConfig{
1206+
numIngesters: 2,
1207+
happyIngesters: 2,
1208+
numDistributors: 1,
1209+
shardByAllLabels: true,
1210+
limits: limits,
1211+
replicationFactor: 2,
1212+
})
1213+
1214+
allSeriesMatchers := []*labels.Matcher{
1215+
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
1216+
}
1217+
// Push a single series to allow us to calculate the label size to calculate the limit for the test.
1218+
writeReq := &cortexpb.WriteRequest{}
1219+
writeReq.Timeseries = append(writeReq.Timeseries,
1220+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0),
1221+
)
1222+
writeRes, err := ds[0].Push(ctx, writeReq)
1223+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1224+
assert.Nil(t, err)
1225+
labelSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1226+
require.NoError(t, err)
1227+
1228+
// Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
1229+
var responseLabelSize = labelSizeResponse.LabelsSize()
1230+
var maxBytesLimit = (seriesToAdd) * responseLabelSize * 2 // Multiplying by RF because the limit is applied before de-duping.
1231+
1232+
// Update the limiter with the calculated limits.
1233+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit))
1234+
1235+
// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
1236+
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
1237+
writeRes, err = ds[0].Push(ctx, writeReq)
1238+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1239+
assert.Nil(t, err)
1240+
1241+
// Since the number of chunk bytes is equal to the limit (but doesn't
1242+
// exceed it), we expect a query running on all series to succeed.
1243+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1244+
require.NoError(t, err)
1245+
assert.Len(t, queryRes.Chunkseries, seriesToAdd)
1246+
1247+
// Push another series to exceed the chunk bytes limit once we'll query back all series.
1248+
writeReq = &cortexpb.WriteRequest{}
1249+
writeReq.Timeseries = append(writeReq.Timeseries,
1250+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0),
1251+
)
1252+
1253+
writeRes, err = ds[0].Push(ctx, writeReq)
1254+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1255+
assert.Nil(t, err)
1256+
1257+
// Since the aggregated chunk size is exceeding the limit, we expect
1258+
// a query running on all series to fail.
1259+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1260+
require.Error(t, err)
1261+
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit)))
1262+
}
1263+
11951264
func TestDistributor_Push_LabelRemoval(t *testing.T) {
11961265
ctx := user.InjectOrgID(context.Background(), "user")
11971266

@@ -1930,7 +1999,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19301999
},
19312000
expectedResult: []metric.Metric{},
19322001
expectedIngesters: numIngesters,
1933-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2002+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19342003
expectedErr: nil,
19352004
},
19362005
"should filter metrics by single matcher": {
@@ -1942,7 +2011,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19422011
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19432012
},
19442013
expectedIngesters: numIngesters,
1945-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2014+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19462015
expectedErr: nil,
19472016
},
19482017
"should filter metrics by multiple matchers": {
@@ -1954,7 +2023,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19542023
{Metric: util.LabelsToMetric(fixtures[0].lbls)},
19552024
},
19562025
expectedIngesters: numIngesters,
1957-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2026+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19582027
expectedErr: nil,
19592028
},
19602029
"should return all matching metrics even if their FastFingerprint collide": {
@@ -1966,7 +2035,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19662035
{Metric: util.LabelsToMetric(fixtures[4].lbls)},
19672036
},
19682037
expectedIngesters: numIngesters,
1969-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2038+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19702039
expectedErr: nil,
19712040
},
19722041
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
@@ -1980,7 +2049,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19802049
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19812050
},
19822051
expectedIngesters: 3,
1983-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2052+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19842053
expectedErr: nil,
19852054
},
19862055
"should query all ingesters if shuffle sharding is enabled but shard size is 0": {
@@ -1994,7 +2063,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19942063
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19952064
},
19962065
expectedIngesters: numIngesters,
1997-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2066+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19982067
expectedErr: nil,
19992068
},
20002069
"should return err if series limit is exhausted": {
@@ -2005,7 +2074,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20052074
},
20062075
expectedResult: nil,
20072076
expectedIngesters: numIngesters,
2008-
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
2077+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
20092078
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
20102079
},
20112080
"should not exhaust series limit when only one series is fetched": {
@@ -2016,7 +2085,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20162085
{Metric: util.LabelsToMetric(fixtures[2].lbls)},
20172086
},
20182087
expectedIngesters: numIngesters,
2019-
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
2088+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
20202089
expectedErr: nil,
20212090
},
20222091
}
@@ -2116,7 +2185,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
21162185
matchers: []*labels.Matcher{
21172186
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"),
21182187
},
2119-
queryLimiter: limiter.NewQueryLimiter(100, 0, 0),
2188+
queryLimiter: limiter.NewQueryLimiter(100, 0, 0, 0),
21202189
expectedErr: nil,
21212190
},
21222191
}

pkg/distributor/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
335335
return nil, validation.LimitError(chunkBytesLimitErr.Error())
336336
}
337337

338+
if labelBytesLimitErr := queryLimiter.AddDataBytes(resp.ChunksSize() + resp.LabelsSize()); labelBytesLimitErr != nil {
339+
return nil, validation.LimitError(labelBytesLimitErr.Error())
340+
}
341+
338342
for _, series := range resp.Timeseries {
339343
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
340344
return nil, validation.LimitError(limitErr.Error())
@@ -392,6 +396,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
392396

393397
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
394398
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
399+
reqStats.AddFetchedDataBytes(uint64(resp.LabelsSize() + resp.ChunksSize()))
395400

396401
return resp, nil
397402
}

pkg/frontend/transport/handler.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ type Handler struct {
6060
roundTripper http.RoundTripper
6161

6262
// Metrics.
63-
querySeconds *prometheus.CounterVec
64-
querySeries *prometheus.CounterVec
65-
queryBytes *prometheus.CounterVec
66-
activeUsers *util.ActiveUsersCleanupService
63+
querySeconds *prometheus.CounterVec
64+
querySeries *prometheus.CounterVec
65+
queryBytes *prometheus.CounterVec
66+
queryDataBytes *prometheus.CounterVec
67+
activeUsers *util.ActiveUsersCleanupService
6768
}
6869

6970
// NewHandler creates a new frontend handler.
@@ -90,10 +91,16 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
9091
Help: "Size of all chunks fetched to execute a query in bytes.",
9192
}, []string{"user"})
9293

94+
h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
95+
Name: "cortex_query_fetched_data_bytes_total",
96+
Help: "Size of all data fetched to execute a query in bytes.",
97+
}, []string{"user"})
98+
9399
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
94100
h.querySeconds.DeleteLabelValues(user)
95101
h.querySeries.DeleteLabelValues(user)
96102
h.queryBytes.DeleteLabelValues(user)
103+
h.queryDataBytes.DeleteLabelValues(user)
97104
})
98105
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
99106
_ = h.activeUsers.StartAsync(context.Background())
@@ -186,11 +193,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
186193
wallTime := stats.LoadWallTime()
187194
numSeries := stats.LoadFetchedSeries()
188195
numBytes := stats.LoadFetchedChunkBytes()
196+
numDataBytes := stats.LoadFetchedDataBytes()
189197

190198
// Track stats.
191199
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
192200
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
193201
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
202+
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
194203
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
195204

196205
// Log stats.
@@ -203,6 +212,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
203212
"query_wall_time_seconds", wallTime.Seconds(),
204213
"fetched_series_count", numSeries,
205214
"fetched_chunks_bytes", numBytes,
215+
"fetched_data_bytes", numDataBytes,
206216
}, formatQueryString(queryString)...)
207217

208218
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

pkg/ingester/client/custom.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,24 @@ func (m *QueryStreamResponse) ChunksSize() int {
2727
}
2828
return size
2929
}
30+
31+
// ChunksSize returns the size of all chunks in the response.
32+
func (m *QueryStreamResponse) LabelsSize() int {
33+
if len(m.Timeseries) == 0 && len(m.Chunkseries) == 0 {
34+
return 0
35+
}
36+
37+
size := 0
38+
for _, entry := range m.Chunkseries {
39+
for _, label := range entry.Labels {
40+
size += label.Size()
41+
}
42+
}
43+
44+
for _, entry := range m.Timeseries {
45+
for _, label := range entry.Labels {
46+
size += label.Size()
47+
}
48+
}
49+
return size
50+
}

pkg/querier/blocks_store_queryable.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,12 +652,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
652652
}
653653
}
654654
chunksSize := countChunkBytes(s)
655+
labelsSize := countLabelBytes(s)
655656
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
656657
return validation.LimitError(chunkBytesLimitErr.Error())
657658
}
658659
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
659660
return validation.LimitError(chunkLimitErr.Error())
660661
}
662+
if dataBytesLimitErr := queryLimiter.AddDataBytes(labelsSize + chunksSize); dataBytesLimitErr != nil {
663+
return validation.LimitError(dataBytesLimitErr.Error())
664+
}
661665
}
662666

663667
if w := resp.GetWarning(); w != "" {
@@ -681,14 +685,18 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
681685

682686
numSeries := len(mySeries)
683687
chunkBytes := countChunkBytes(mySeries...)
688+
labelBytes := countLabelBytes(mySeries...)
689+
dataBytes := labelBytes + chunkBytes
684690

685691
reqStats.AddFetchedSeries(uint64(numSeries))
686692
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
693+
reqStats.AddFetchedDataBytes(uint64(dataBytes))
687694

688695
level.Debug(spanLog).Log("msg", "received series from store-gateway",
689696
"instance", c.RemoteAddress(),
690697
"fetched series", numSeries,
691698
"fetched chunk bytes", chunkBytes,
699+
"fetched data bytes", dataBytes,
692700
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
693701
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
694702

@@ -993,6 +1001,17 @@ func countChunkBytes(series ...*storepb.Series) (count int) {
9931001
return count
9941002
}
9951003

1004+
// countChunkBytes returns the size of the chunks making up the provided series in bytes
1005+
func countLabelBytes(series ...*storepb.Series) (count int) {
1006+
for _, s := range series {
1007+
for _, l := range s.Labels {
1008+
count += l.Size()
1009+
}
1010+
}
1011+
1012+
return count
1013+
}
1014+
9961015
// only retry connection issues
9971016
func isRetryableError(err error) bool {
9981017
switch status.Code(err) {

0 commit comments

Comments
 (0)