Skip to content

Add retries for instant query #5560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558
* [ENHANCEMENT] Query Frontend: Add retries for instant query. #5560
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
4 changes: 3 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec := queryrange.NewPrometheusCodec(false)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)
retryMiddlewareMetrics := queryrange.NewRetryMiddlewareMetrics(prometheus.DefaultRegisterer)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand All @@ -466,12 +467,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryAnalyzer,
prometheusCodec,
shardedPrometheusCodec,
retryMiddlewareMetrics,
)
if err != nil {
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, retryMiddlewareMetrics, t.Cfg.QueryRange.MaxRetries, queryAnalyzer)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ import (
"github.com/thanos-io/thanos/pkg/querysharding"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Middlewares(
log log.Logger,
limits tripperware.Limits,
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
maxRetries int,
queryAnalyzer querysharding.Analyzer,
) ([]tripperware.Middleware, error) {
var m []tripperware.Middleware

if maxRetries > 0 {
m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, retryMiddlewareMetrics))
}
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
return m, nil
}
118 changes: 118 additions & 0 deletions pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package instantquery

import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var (
query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all"
responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}`
)

func TestRoundTrip(t *testing.T) {
t.Parallel()
var try atomic.Int32
s := httptest.NewServer(
middleware.AuthenticateUser.Wrap(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
if try.Inc() > 2 {
_, err = w.Write([]byte(responseBody))
} else {
http.Error(w, `{"status":"error"}`, http.StatusInternalServerError)
}
if err != nil {
t.Fatal(err)
}
}),
),
)
defer s.Close()

u, err := url.Parse(s.URL)
require.NoError(t, err)

downstream := singleHostRoundTripper{
host: u.Host,
next: http.DefaultTransport,
}
limits := tripperware.MockLimits{
ShardSize: 2,
}
qa := querysharding.NewQueryAnalyzer()
instantQueryMiddlewares, err := Middlewares(
log.NewNopLogger(),
limits,
nil,
3,
qa)
require.NoError(t, err)

tw := tripperware.NewQueryTripperware(
log.NewNopLogger(),
nil,
nil,
nil,
instantQueryMiddlewares,
nil,
InstantQueryCodec,
limits,
qa,
time.Minute,
)

for i, tc := range []struct {
path, expectedBody string
}{
{query, responseBody},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
//parallel testing causes data race
req, err := http.NewRequest("GET", tc.path, http.NoBody)
require.NoError(t, err)

// query-frontend doesn't actually authenticate requests, we rely on
// the queriers to do this. Hence we ensure the request doesn't have a
// org ID in the ctx, but does have the header.
ctx := user.InjectOrgID(context.Background(), "1")
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

resp, err := tw(downstream).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

bs, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, tc.expectedBody, string(bs))
})
}
}

type singleHostRoundTripper struct {
host string
next http.RoundTripper
}

func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
r.URL.Scheme = "http"
r.URL.Host = s.host
return s.next.RoundTrip(r)
}
30 changes: 2 additions & 28 deletions pkg/querier/tripperware/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) {
End: util.TimeToMillis(testData.reqEndTime),
}

limits := mockLimits{maxQueryLookback: testData.maxQueryLookback}
limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback}
middleware := NewLimitsMiddleware(limits)

innerRes := NewEmptyPrometheusResponse()
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
End: util.TimeToMillis(testData.reqEndTime),
}

limits := mockLimits{maxQueryLength: testData.maxQueryLength}
limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength}
middleware := NewLimitsMiddleware(limits)

innerRes := NewEmptyPrometheusResponse()
Expand Down Expand Up @@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
}
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
return m.maxQueryLookback
}

func (m mockLimits) MaxQueryLength(string) time.Duration {
return m.maxQueryLength
}

func (mockLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (m mockLimits) MaxCacheFreshness(string) time.Duration {
return m.maxCacheFreshness
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return 0
}

type mockHandler struct {
mock.Mock
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func Middlewares(
queryAnalyzer querysharding.Analyzer,
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
retryMiddlewareMetrics *RetryMiddlewareMetrics,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand Down Expand Up @@ -110,7 +111,7 @@ func Middlewares(
}

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
}

queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func TestRoundTrip(t *testing.T) {
qa := querysharding.NewQueryAnalyzer()
queyrangemiddlewares, _, err := Middlewares(Config{},
log.NewNopLogger(),
mockLimits{},
tripperware.MockLimits{},
nil,
nil,
nil,
qa,
PrometheusCodec,
ShardedPrometheusCodec,
NewRetryMiddlewareMetrics(nil),
)
require.NoError(t, err)

Expand Down
16 changes: 8 additions & 8 deletions pkg/querier/tripperware/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestStatsCacheQuerySamples(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestHandleHit(t *testing.T) {
sut := resultsCache{
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
limits: mockLimits{},
limits: tripperware.MockLimits{},
merger: PrometheusCodec,
next: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) {
return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func TestResultsCache(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestResultsCacheRecent(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1087,13 +1087,13 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
expectedResponse *PrometheusResponse
}{
{
fakeLimits: mockLimits{maxCacheFreshness: 5 * time.Second},
fakeLimits: tripperware.MockLimits{CacheFreshness: 5 * time.Second},
Handler: nil,
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
},
{
// should not lookup cache because per-tenant override will be applied
fakeLimits: mockLimits{maxCacheFreshness: 10 * time.Minute},
fakeLimits: tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
Handler: tripperware.HandlerFunc(func(_ context.Context, _ tripperware.Request) (tripperware.Response, error) {
return parsedResponse, nil
}),
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) {
roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{
host: u.Host,
next: http.DefaultTransport,
}, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil))
}, PrometheusCodec, nil, NewLimitsMiddleware(tripperware.MockLimits{}), SplitByIntervalMiddleware(interval, tripperware.MockLimits{}, PrometheusCodec, nil))

req, err := http.NewRequest("GET", tc.path, http.NoBody)
require.NoError(t, err)
Expand Down
30 changes: 15 additions & 15 deletions pkg/querier/tripperware/test_shard_by_query_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ http_requests_total`,
}

qa := thanosquerysharding.NewQueryAnalyzer()
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa))
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), MockLimits{ShardSize: tt.shardSize}, tt.codec, qa))

ctx := user.InjectOrgID(context.Background(), "1")

Expand All @@ -461,31 +461,31 @@ http_requests_total`,
}
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
shardSize int
type MockLimits struct {
QueryLookback time.Duration
QueryLength time.Duration
CacheFreshness time.Duration
ShardSize int
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
return m.maxQueryLookback
func (m MockLimits) MaxQueryLookback(string) time.Duration {
return m.QueryLookback
}

func (m mockLimits) MaxQueryLength(string) time.Duration {
return m.maxQueryLength
func (m MockLimits) MaxQueryLength(string) time.Duration {
return m.QueryLength
}

func (mockLimits) MaxQueryParallelism(string) int {
func (MockLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (m mockLimits) MaxCacheFreshness(string) time.Duration {
return m.maxCacheFreshness
func (m MockLimits) MaxCacheFreshness(string) time.Duration {
return m.CacheFreshness
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return m.shardSize
func (m MockLimits) QueryVerticalShardSize(userID string) int {
return m.ShardSize
}

type singleHostRoundTripper struct {
Expand Down