Skip to content

Query Frontend: Handle context error before decoding and merging responses #5499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
* [BUGFIX] Alertmanager: Remove the user id from state replication key metric label value. #5453
* [BUGFIX] Compactor: Avoid cleaner concurrency issues checking global markers before all blocks. #5457
* [BUGFIX] DDBKV: Disallow instance with older timestamp to update instance with newer timestamp. #5480
* [BUGFIX] Query Frontend: Handle context error before decoding and merging responses. #5499

## 1.15.1 2023-04-26

* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290
Expand Down
25 changes: 20 additions & 5 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _
log, ctx := spanlogger.New(ctx, "PrometheusInstantQueryResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

if err := ctx.Err(); err != nil {
return nil, err
}

buf, err := tripperware.BodyBuffer(r, log)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -266,7 +270,7 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
// For now, we only shard queries that returns a vector.
switch promResponses[0].Data.ResultType {
case model.ValVector.String():
v, err := vectorMerge(req, promResponses)
v, err := vectorMerge(ctx, req, promResponses)
if err != nil {
return nil, err
}
Expand All @@ -280,12 +284,17 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
Stats: statsMerge(promResponses),
}
case model.ValMatrix.String():
sampleStreams, err := matrixMerge(ctx, promResponses)
if err != nil {
return nil, err
}

data = PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Matrix{
Matrix: &Matrix{
SampleStreams: matrixMerge(promResponses),
SampleStreams: sampleStreams,
},
},
},
Expand All @@ -302,7 +311,7 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
return res, nil
}

func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
func vectorMerge(ctx context.Context, req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
output := map[string]*Sample{}
metrics := []string{} // Used to preserve the order for topk and bottomk.
sortPlan, err := sortPlanForQuery(req.GetQuery())
Expand All @@ -311,6 +320,9 @@ func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryRespons
}
buf := make([]byte, 0, 1024)
for _, resp := range resps {
if err = ctx.Err(); err != nil {
return nil, err
}
if resp == nil {
continue
}
Expand Down Expand Up @@ -439,9 +451,12 @@ func sortPlanForQuery(q string) (sortPlan, error) {
return sortByLabels, nil
}

func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleStream {
func matrixMerge(ctx context.Context, resps []*PrometheusInstantQueryResponse) ([]tripperware.SampleStream, error) {
output := make(map[string]tripperware.SampleStream)
for _, resp := range resps {
if err := ctx.Err(); err != nil {
return nil, err
}
if resp == nil {
continue
}
Expand All @@ -462,7 +477,7 @@ func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleSt
result = append(result, output[key])
}

return result
return result, nil
}

// NewEmptyPrometheusInstantQueryResponse returns an empty successful Prometheus query range response.
Expand Down
62 changes: 50 additions & 12 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,14 @@ func TestMergeResponse(t *testing.T) {
Query: "sum(up)",
}
for _, tc := range []struct {
name string
req tripperware.Request
resps []string
expectedResp string
expectedErr error
name string
req tripperware.Request
resps []string
expectedResp string
expectedErr error
cancelBeforeDecode bool
expectedDecodeErr error
cancelBeforeMerge bool
}{
{
name: "empty response",
Expand Down Expand Up @@ -355,10 +358,31 @@ func TestMergeResponse(t *testing.T) {
},
expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`,
},
{
name: "context cancelled before decoding response",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
},
expectedDecodeErr: context.Canceled,
cancelBeforeDecode: true,
},
{
name: "context cancelled before merging response",
req: defaultReq,
resps: []string{
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
},
expectedErr: context.Canceled,
cancelBeforeMerge: true,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())

var resps []tripperware.Response
for _, r := range tc.resps {
Expand All @@ -367,20 +391,34 @@ func TestMergeResponse(t *testing.T) {
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: io.NopCloser(bytes.NewBuffer([]byte(r))),
}
dr, err := InstantQueryCodec.DecodeResponse(context.Background(), hr, nil)
require.NoError(t, err)

if tc.cancelBeforeDecode {
cancelCtx()
}
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
assert.Equal(t, tc.expectedDecodeErr, err)
if err != nil {
cancelCtx()
return
}
resps = append(resps, dr)
}
resp, err := InstantQueryCodec.MergeResponse(context.Background(), tc.req, resps...)
assert.Equal(t, err, tc.expectedErr)

if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
return
}
dr, err := InstantQueryCodec.EncodeResponse(context.Background(), resp)
assert.Equal(t, err, tc.expectedErr)
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
assert.Equal(t, tc.expectedErr, err)
contents, err := io.ReadAll(dr.Body)
assert.Equal(t, err, tc.expectedErr)
assert.Equal(t, tc.expectedErr, err)
assert.Equal(t, string(contents), tc.expectedResp)
cancelCtx()
})
}
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,16 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques

// Merge the responses.
sort.Sort(byFirstTime(promResponses))
sampleStreams, err := matrixMerge(ctx, promResponses)
if err != nil {
return nil, err
}

response := PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Result: sampleStreams,
Stats: statsMerge(c.sharded, promResponses),
},
}
Expand Down Expand Up @@ -263,6 +267,10 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

if err := ctx.Err(); err != nil {
return nil, err
}

buf, err := tripperware.BodyBuffer(r, log)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -347,9 +355,12 @@ func statsMerge(shouldSumStats bool, resps []*PrometheusResponse) *tripperware.P
return tripperware.StatsMerge(output)
}

func matrixMerge(resps []*PrometheusResponse) []tripperware.SampleStream {
func matrixMerge(ctx context.Context, resps []*PrometheusResponse) ([]tripperware.SampleStream, error) {
output := make(map[string]tripperware.SampleStream)
for _, resp := range resps {
if err := ctx.Err(); err != nil {
return nil, err
}
if resp == nil {
continue
}
Expand All @@ -367,7 +378,7 @@ func matrixMerge(resps []*PrometheusResponse) []tripperware.SampleStream {
result = append(result, output[key])
}

return result
return result, nil
}

func parseDurationMs(s string) (int64, error) {
Expand Down
Loading