Skip to content

Commit e3f869b

Browse files
committed
Add retries for instant query
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
1 parent b49cf5a commit e3f869b

19 files changed

+360
-55
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
3636
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
3737
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
38+
* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553
3839
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
3940
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
4041
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
@@ -59,6 +60,8 @@
5960
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
6061
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
6162
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
63+
* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558
64+
* [ENHANCEMENT] Query Frontend: Add retries for instant query. #5560
6265
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
6366
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
6467
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,11 @@ blocks_storage:
499499
# CLI flag: -blocks-storage.bucket-store.max-concurrent
500500
[max_concurrent: <int> | default = 100]
501501
502+
# Max number of inflight queries to execute against the long-term storage.
503+
# The limit is shared across all tenants. 0 to disable.
504+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
505+
[max_inflight_requests: <int> | default = 0]
506+
502507
# Maximum number of concurrent tenants synching blocks.
503508
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
504509
[tenant_sync_concurrency: <int> | default = 10]

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,11 @@ blocks_storage:
602602
# CLI flag: -blocks-storage.bucket-store.max-concurrent
603603
[max_concurrent: <int> | default = 100]
604604
605+
# Max number of inflight queries to execute against the long-term storage.
606+
# The limit is shared across all tenants. 0 to disable.
607+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
608+
[max_inflight_requests: <int> | default = 0]
609+
605610
# Maximum number of concurrent tenants synching blocks.
606611
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
607612
[tenant_sync_concurrency: <int> | default = 10]

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,11 @@ bucket_store:
10421042
# CLI flag: -blocks-storage.bucket-store.max-concurrent
10431043
[max_concurrent: <int> | default = 100]
10441044
1045+
# Max number of inflight queries to execute against the long-term storage. The
1046+
# limit is shared across all tenants. 0 to disable.
1047+
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
1048+
[max_inflight_requests: <int> | default = 0]
1049+
10451050
# Maximum number of concurrent tenants synching blocks.
10461051
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
10471052
[tenant_sync_concurrency: <int> | default = 10]

pkg/cortex/modules.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
455455
prometheusCodec := queryrange.NewPrometheusCodec(false)
456456
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
457457
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)
458+
retryMiddlewareMetrics := queryrange.NewRetryMiddlewareMetrics(prometheus.DefaultRegisterer)
458459

459460
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
460461
t.Cfg.QueryRange,
@@ -466,12 +467,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
466467
queryAnalyzer,
467468
prometheusCodec,
468469
shardedPrometheusCodec,
470+
retryMiddlewareMetrics,
469471
)
470472
if err != nil {
471473
return nil, err
472474
}
473475

474-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
476+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, retryMiddlewareMetrics, t.Cfg.QueryRange.MaxRetries, queryAnalyzer)
475477
if err != nil {
476478
return nil, err
477479
}

pkg/querier/blocks_store_queryable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,12 @@ func isRetryableError(err error) bool {
11161116
switch status.Code(err) {
11171117
case codes.Unavailable:
11181118
return true
1119+
case codes.ResourceExhausted:
1120+
return errors.Is(err, storegateway.ErrTooManyInflightRequests)
1121+
// Client side connection closing, this error happens during store gateway deployment.
1122+
// https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67
1123+
case codes.Canceled:
1124+
return strings.Contains(err.Error(), "grpc: the client connection is closing")
11191125
default:
11201126
return false
11211127
}

pkg/querier/blocks_store_queryable_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/status"
3535

3636
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
37+
"github.com/cortexproject/cortex/pkg/storegateway"
3738
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
3839
"github.com/cortexproject/cortex/pkg/util"
3940
"github.com/cortexproject/cortex/pkg/util/limiter"
@@ -638,6 +639,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
638639
},
639640
},
640641
},
642+
"multiple store-gateways has the block, but one of them fails to return due to clientconn closing": {
643+
finderResult: bucketindex.Blocks{
644+
{ID: block1},
645+
},
646+
storeSetResponses: []interface{}{
647+
map[BlocksStoreClient][]ulid.ULID{
648+
&storeGatewayClientMock{
649+
remoteAddr: "1.1.1.1",
650+
mockedSeriesErr: status.Error(codes.Canceled, "grpc: the client connection is closing"),
651+
}: {block1},
652+
},
653+
map[BlocksStoreClient][]ulid.ULID{
654+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
655+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
656+
mockHintsResponse(block1),
657+
}}: {block1},
658+
},
659+
},
660+
limits: &blocksStoreLimitsMock{},
661+
queryLimiter: noOpQueryLimiter,
662+
expectedSeries: []seriesResult{
663+
{
664+
lbls: labels.New(metricNameLabel, series1Label),
665+
values: []valueResult{
666+
{t: minT, v: 2},
667+
},
668+
},
669+
},
670+
},
641671
"all store-gateways return PermissionDenied": {
642672
finderResult: bucketindex.Blocks{
643673
{ID: block1},
@@ -708,6 +738,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
708738
},
709739
},
710740
},
741+
"multiple store-gateways has the block, but one of them had too many inflight requests": {
742+
finderResult: bucketindex.Blocks{
743+
{ID: block1},
744+
},
745+
storeSetResponses: []interface{}{
746+
map[BlocksStoreClient][]ulid.ULID{
747+
&storeGatewayClientMock{
748+
remoteAddr: "1.1.1.1",
749+
mockedSeriesErr: storegateway.ErrTooManyInflightRequests,
750+
}: {block1},
751+
},
752+
map[BlocksStoreClient][]ulid.ULID{
753+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
754+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
755+
mockHintsResponse(block1),
756+
}}: {block1},
757+
},
758+
},
759+
limits: &blocksStoreLimitsMock{},
760+
queryLimiter: noOpQueryLimiter,
761+
expectedSeries: []seriesResult{
762+
{
763+
lbls: labels.New(metricNameLabel, series1Label),
764+
values: []valueResult{
765+
{t: minT, v: 2},
766+
},
767+
},
768+
},
769+
},
770+
"store gateway returns resource exhausted error other than max inflight request": {
771+
finderResult: bucketindex.Blocks{
772+
{ID: block1},
773+
},
774+
storeSetResponses: []interface{}{
775+
map[BlocksStoreClient][]ulid.ULID{
776+
&storeGatewayClientMock{
777+
remoteAddr: "1.1.1.1",
778+
mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"),
779+
}: {block1},
780+
},
781+
map[BlocksStoreClient][]ulid.ULID{
782+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
783+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
784+
mockHintsResponse(block1),
785+
}}: {block1},
786+
},
787+
},
788+
limits: &blocksStoreLimitsMock{},
789+
expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"),
790+
},
711791
}
712792

713793
for testName, testData := range tests {

pkg/querier/tripperware/instantquery/instant_query_middlewares.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,21 @@ import (
55
"github.com/thanos-io/thanos/pkg/querysharding"
66

77
"github.com/cortexproject/cortex/pkg/querier/tripperware"
8+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
89
)
910

1011
func Middlewares(
1112
log log.Logger,
1213
limits tripperware.Limits,
14+
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
15+
maxRetries int,
1316
queryAnalyzer querysharding.Analyzer,
1417
) ([]tripperware.Middleware, error) {
1518
var m []tripperware.Middleware
1619

20+
if maxRetries > 0 {
21+
m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, retryMiddlewareMetrics))
22+
}
1723
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
1824
return m, nil
1925
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/go-kit/log"
14+
"github.com/stretchr/testify/require"
15+
"github.com/thanos-io/thanos/pkg/querysharding"
16+
"github.com/weaveworks/common/middleware"
17+
"github.com/weaveworks/common/user"
18+
"go.uber.org/atomic"
19+
20+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
21+
)
22+
23+
var (
24+
query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all"
25+
responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}`
26+
)
27+
28+
func TestRoundTrip(t *testing.T) {
29+
t.Parallel()
30+
var try atomic.Int32
31+
s := httptest.NewServer(
32+
middleware.AuthenticateUser.Wrap(
33+
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
34+
var err error
35+
if try.Inc() > 2 {
36+
_, err = w.Write([]byte(responseBody))
37+
} else {
38+
http.Error(w, `{"status":"error"}`, http.StatusInternalServerError)
39+
}
40+
if err != nil {
41+
t.Fatal(err)
42+
}
43+
}),
44+
),
45+
)
46+
defer s.Close()
47+
48+
u, err := url.Parse(s.URL)
49+
require.NoError(t, err)
50+
51+
downstream := singleHostRoundTripper{
52+
host: u.Host,
53+
next: http.DefaultTransport,
54+
}
55+
limits := tripperware.MockLimits{
56+
ShardSize: 2,
57+
}
58+
qa := querysharding.NewQueryAnalyzer()
59+
instantQueryMiddlewares, err := Middlewares(
60+
log.NewNopLogger(),
61+
limits,
62+
nil,
63+
3,
64+
qa)
65+
require.NoError(t, err)
66+
67+
tw := tripperware.NewQueryTripperware(
68+
log.NewNopLogger(),
69+
nil,
70+
nil,
71+
nil,
72+
instantQueryMiddlewares,
73+
nil,
74+
InstantQueryCodec,
75+
limits,
76+
qa,
77+
time.Minute,
78+
)
79+
80+
for i, tc := range []struct {
81+
path, expectedBody string
82+
}{
83+
{query, responseBody},
84+
} {
85+
t.Run(strconv.Itoa(i), func(t *testing.T) {
86+
//parallel testing causes data race
87+
req, err := http.NewRequest("GET", tc.path, http.NoBody)
88+
require.NoError(t, err)
89+
90+
// query-frontend doesn't actually authenticate requests, we rely on
91+
// the queriers to do this. Hence we ensure the request doesn't have a
92+
// org ID in the ctx, but does have the header.
93+
ctx := user.InjectOrgID(context.Background(), "1")
94+
req = req.WithContext(ctx)
95+
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
96+
require.NoError(t, err)
97+
98+
resp, err := tw(downstream).RoundTrip(req)
99+
require.NoError(t, err)
100+
require.Equal(t, 200, resp.StatusCode)
101+
102+
bs, err := io.ReadAll(resp.Body)
103+
require.NoError(t, err)
104+
require.Equal(t, tc.expectedBody, string(bs))
105+
})
106+
}
107+
}
108+
109+
type singleHostRoundTripper struct {
110+
host string
111+
next http.RoundTripper
112+
}
113+
114+
func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
115+
r.URL.Scheme = "http"
116+
r.URL.Host = s.host
117+
return s.next.RoundTrip(r)
118+
}

pkg/querier/tripperware/queryrange/limits_test.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) {
7575
End: util.TimeToMillis(testData.reqEndTime),
7676
}
7777

78-
limits := mockLimits{maxQueryLookback: testData.maxQueryLookback}
78+
limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback}
7979
middleware := NewLimitsMiddleware(limits)
8080

8181
innerRes := NewEmptyPrometheusResponse()
@@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
163163
End: util.TimeToMillis(testData.reqEndTime),
164164
}
165165

166-
limits := mockLimits{maxQueryLength: testData.maxQueryLength}
166+
limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength}
167167
middleware := NewLimitsMiddleware(limits)
168168

169169
innerRes := NewEmptyPrometheusResponse()
@@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
193193
}
194194
}
195195

196-
type mockLimits struct {
197-
maxQueryLookback time.Duration
198-
maxQueryLength time.Duration
199-
maxCacheFreshness time.Duration
200-
}
201-
202-
func (m mockLimits) MaxQueryLookback(string) time.Duration {
203-
return m.maxQueryLookback
204-
}
205-
206-
func (m mockLimits) MaxQueryLength(string) time.Duration {
207-
return m.maxQueryLength
208-
}
209-
210-
func (mockLimits) MaxQueryParallelism(string) int {
211-
return 14 // Flag default.
212-
}
213-
214-
func (m mockLimits) MaxCacheFreshness(string) time.Duration {
215-
return m.maxCacheFreshness
216-
}
217-
218-
func (m mockLimits) QueryVerticalShardSize(userID string) int {
219-
return 0
220-
}
221-
222196
type mockHandler struct {
223197
mock.Mock
224198
}

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func Middlewares(
8080
queryAnalyzer querysharding.Analyzer,
8181
prometheusCodec tripperware.Codec,
8282
shardedPrometheusCodec tripperware.Codec,
83+
retryMiddlewareMetrics *RetryMiddlewareMetrics,
8384
) ([]tripperware.Middleware, cache.Cache, error) {
8485
// Metric used to keep track of each middleware execution duration.
8586
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
@@ -110,7 +111,7 @@ func Middlewares(
110111
}
111112

112113
if cfg.MaxRetries > 0 {
113-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
114+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
114115
}
115116

116117
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))

0 commit comments

Comments
 (0)