Skip to content

Commit f1d3d1e

Browse files
authored
Reject small subquery step size in query frontend (#5323)
* check subquery step size in query frontend Signed-off-by: Ben Ye <benye@amazon.com> * more test cases Signed-off-by: Ben Ye <benye@amazon.com> * changelog Signed-off-by: Ben Ye <benye@amazon.com> * try again Signed-off-by: Ben Ye <benye@amazon.com> * update again Signed-off-by: Ben Ye <benye@amazon.com> * fix test Signed-off-by: Ben Ye <benye@amazon.com> * address review comments Signed-off-by: Ben Ye <benye@amazon.com> --------- Signed-off-by: Ben Ye <benye@amazon.com>
1 parent 019ebad commit f1d3d1e

15 files changed

+232
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
88
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
99
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
10+
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
1011
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
1112
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
1213
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

integration/query_frontend_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"crypto/x509"
88
"crypto/x509/pkix"
99
"fmt"
10+
"net/http"
1011
"os"
1112
"path/filepath"
1213
"strconv"
@@ -33,6 +34,7 @@ type queryFrontendTestConfig struct {
3334
querySchedulerEnabled bool
3435
queryStatsEnabled bool
3536
remoteReadEnabled bool
37+
testSubQueryStepSize bool
3638
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
3739
}
3840

@@ -209,6 +211,19 @@ func TestQueryFrontendRemoteRead(t *testing.T) {
209211
})
210212
}
211213

214+
func TestQueryFrontendSubQueryStepSize(t *testing.T) {
215+
runQueryFrontendTest(t, queryFrontendTestConfig{
216+
testSubQueryStepSize: true,
217+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
218+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
219+
220+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
221+
require.NoError(t, s.StartAndWaitReady(minio))
222+
return cortexConfigFile, flags
223+
},
224+
})
225+
}
226+
212227
func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
213228
const numUsers = 10
214229
const numQueriesPerUser = 10
@@ -334,6 +349,12 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
334349
require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0)
335350
}
336351

352+
// No need to repeat the test on subquery step size.
353+
if userID == 0 && cfg.testSubQueryStepSize {
354+
resp, _, _ := c.QueryRaw(`up[30d:1m]`, now)
355+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
356+
}
357+
337358
// In this test we do ensure that the /series start/end time is ignored and Cortex
338359
// always returns series in ingesters memory. No need to repeat it for each user.
339360
if userID == 0 {
@@ -386,6 +407,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
386407
extra++
387408
}
388409

410+
if cfg.testSubQueryStepSize {
411+
extra++
412+
}
413+
389414
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))
390415

391416
// The number of received request is greater than the query requests because include

pkg/cortex/modules.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,12 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
450450
// to optimize Prometheus query requests.
451451
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
452452
queryAnalyzer := querysharding.NewQueryAnalyzer()
453+
defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval
454+
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
455+
prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval)
456+
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
457+
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval)
458+
453459
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
454460
t.Cfg.QueryRange,
455461
util_log.Logger,
@@ -458,6 +464,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
458464
prometheus.DefaultRegisterer,
459465
t.TombstonesLoader,
460466
queryAnalyzer,
467+
prometheusCodec,
468+
shardedPrometheusCodec,
461469
)
462470
if err != nil {
463471
return nil, err
@@ -473,10 +481,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
473481
t.Cfg.QueryRange.ForwardHeaders,
474482
queryRangeMiddlewares,
475483
instantQueryMiddlewares,
476-
queryrange.PrometheusCodec,
484+
prometheusCodec,
477485
instantquery.InstantQueryCodec,
478486
t.Overrides,
479487
queryAnalyzer,
488+
defaultSubQueryInterval,
480489
)
481490

482491
return services.NewIdleService(nil, func(_ error) error {

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ import (
1919
"github.com/prometheus/common/model"
2020
"github.com/prometheus/prometheus/model/labels"
2121
"github.com/prometheus/prometheus/model/timestamp"
22+
promqlparser "github.com/prometheus/prometheus/promql/parser"
2223
"github.com/weaveworks/common/httpgrpc"
2324
"google.golang.org/grpc/status"
2425

25-
promqlparser "github.com/prometheus/prometheus/promql/parser"
26-
2726
"github.com/cortexproject/cortex/pkg/cortexpb"
2827
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2928
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
@@ -109,7 +108,8 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request {
109108

110109
type instantQueryCodec struct {
111110
tripperware.Codec
112-
now func() time.Time
111+
now func() time.Time
112+
noStepSubQueryInterval time.Duration
113113
}
114114

115115
func newInstantQueryCodec() instantQueryCodec {
@@ -139,6 +139,10 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
139139
}
140140

141141
result.Query = r.FormValue("query")
142+
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
143+
return nil, err
144+
}
145+
142146
result.Stats = r.FormValue("stats")
143147
result.Path = r.URL.Path
144148

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ import (
2222
func TestRequest(t *testing.T) {
2323
t.Parallel()
2424
now := time.Now()
25-
codec := instantQueryCodec{now: func() time.Time {
26-
return now
27-
}}
25+
codec := InstantQueryCodec
2826

2927
for _, tc := range []struct {
3028
url string

pkg/querier/tripperware/instantquery/shard_by_query_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package instantquery
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/cortexproject/cortex/pkg/querier/tripperware"
78
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
89
)
910

1011
func Test_shardQuery(t *testing.T) {
1112
t.Parallel()
12-
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.ShardedPrometheusCodec)
13+
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute))
1314
}

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,21 @@ var (
4040
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
4141
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
4242

43-
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
44-
PrometheusCodec tripperware.Codec = &prometheusCodec{sharded: false}
45-
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
46-
ShardedPrometheusCodec tripperware.Codec = &prometheusCodec{sharded: true}
47-
4843
// Name of the cache control header.
4944
cacheControlHeader = "Cache-Control"
5045
)
5146

5247
type prometheusCodec struct {
5348
sharded bool
49+
50+
noStepSubQueryInterval time.Duration
51+
}
52+
53+
func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive
54+
return &prometheusCodec{
55+
sharded: sharded,
56+
noStepSubQueryInterval: noStepSubQueryInterval,
57+
}
5458
}
5559

5660
// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
@@ -166,7 +170,7 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
166170
return &response, nil
167171
}
168172

169-
func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
173+
func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
170174
var result PrometheusRequest
171175
var err error
172176
result.Start, err = util.ParseTime(r.FormValue("start"))
@@ -199,6 +203,10 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward
199203
}
200204

201205
result.Query = r.FormValue("query")
206+
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
207+
return nil, err
208+
}
209+
202210
result.Stats = r.FormValue("stats")
203211
result.Path = r.URL.Path
204212

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func Middlewares(
7878
registerer prometheus.Registerer,
7979
cacheGenNumberLoader CacheGenNumberLoader,
8080
queryAnalyzer querysharding.Analyzer,
81+
prometheusCodec tripperware.Codec,
82+
shardedPrometheusCodec tripperware.Codec,
8183
) ([]tripperware.Middleware, cache.Cache, error) {
8284
// Metric used to keep track of each middleware execution duration.
8385
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
@@ -88,7 +90,7 @@ func Middlewares(
8890
}
8991
if cfg.SplitQueriesByInterval != 0 {
9092
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
91-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, PrometheusCodec, registerer))
93+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
9294
}
9395

9496
var c cache.Cache
@@ -99,7 +101,7 @@ func Middlewares(
99101
}
100102
return false
101103
}
102-
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, PrometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
104+
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
103105
if err != nil {
104106
return nil, nil, err
105107
}
@@ -111,7 +113,7 @@ func Middlewares(
111113
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
112114
}
113115

114-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer))
116+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))
115117

116118
return queryRangeMiddleware, c, nil
117119
}

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/url"
99
"strconv"
1010
"testing"
11+
"time"
1112

1213
"github.com/go-kit/log"
1314
"github.com/stretchr/testify/require"
@@ -18,6 +19,11 @@ import (
1819
"github.com/cortexproject/cortex/pkg/querier/tripperware"
1920
)
2021

22+
var (
23+
PrometheusCodec = NewPrometheusCodec(false, time.Minute)
24+
ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute)
25+
)
26+
2127
func TestRoundTrip(t *testing.T) {
2228
t.Parallel()
2329
s := httptest.NewServer(
@@ -53,6 +59,8 @@ func TestRoundTrip(t *testing.T) {
5359
nil,
5460
nil,
5561
qa,
62+
PrometheusCodec,
63+
ShardedPrometheusCodec,
5664
)
5765
require.NoError(t, err)
5866

@@ -65,6 +73,7 @@ func TestRoundTrip(t *testing.T) {
6573
nil,
6674
nil,
6775
qa,
76+
time.Minute,
6877
)
6978

7079
for i, tc := range []struct {

pkg/querier/tripperware/queryrange/query_range_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"compress/gzip"
66
"context"
77
"fmt"
8-
io "io"
8+
"io"
99
"net/http"
1010
"strconv"
1111
"testing"
@@ -61,6 +61,10 @@ func TestRequest(t *testing.T) {
6161
url: "api/v1/query_range?start=0&end=11001&step=1",
6262
expectedErr: errStepTooSmall,
6363
},
64+
{
65+
url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10",
66+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000),
67+
},
6468
} {
6569
tc := tc
6670
t.Run(tc.url, func(t *testing.T) {

pkg/querier/tripperware/queryrange/step_align.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
// StepAlignMiddleware aligns the start and end of request to the step to
10-
// improved the cacheability of the query results.
10+
// improve the cacheability of the query results.
1111
var StepAlignMiddleware = tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
1212
return stepAlign{
1313
next: next,

pkg/querier/tripperware/roundtrip.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func NewQueryTripperware(
102102
instantQueryCodec Codec,
103103
limits Limits,
104104
queryAnalyzer querysharding.Analyzer,
105+
defaultSubQueryInterval time.Duration,
105106
) Tripperware {
106107
// Per tenant query metrics.
107108
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
@@ -144,13 +145,18 @@ func NewQueryTripperware(
144145
if isQueryRange {
145146
return queryrange.RoundTrip(r)
146147
} else if isQuery {
148+
// If the given query is not shardable, use downstream roundtripper.
149+
query := r.FormValue("query")
150+
// Check subquery step size.
151+
if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil {
152+
return nil, err
153+
}
154+
147155
// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
148156
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
149157
if numShards <= 1 {
150158
return next.RoundTrip(r)
151159
}
152-
// If the given query is not shardable, use downstream roundtripper.
153-
query := r.FormValue("query")
154160
analysis, err := queryAnalyzer.Analyze(query)
155161
if err != nil || !analysis.IsShardable() {
156162
return next.RoundTrip(r)

0 commit comments

Comments
 (0)