Skip to content

Commit

Permalink
Fix query frontend regression on release v0.17.0 (#3480)
Browse files Browse the repository at this point in the history
* query-frontend: make POST-request to downstream url for labels and series api endpoints (#3444)

Signed-off-by: Alexander Tunik <2braven@gmail.com>

* remove default response cache config

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* ensure order when merging multiple responses

Signed-off-by: Ben Ye <yb532204897@gmail.com>

Co-authored-by: Alexander Tunik <2braven@gmail.com>
  • Loading branch information
yeya24 and 2nick committed Nov 23, 2020
1 parent 4899d9a commit b30dbed
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 89 deletions.
6 changes: 2 additions & 4 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ func registerQueryFrontend(app *extkingpin.App) {
MaxBodySize: 10 * 1024 * 1024,
},
QueryRangeConfig: queryfrontend.QueryRangeConfig{
Limits: &cortexvalidation.Limits{},
ResultsCacheConfig: &queryrange.ResultsCacheConfig{},
Limits: &cortexvalidation.Limits{},
},
LabelsConfig: queryfrontend.LabelsConfig{
Limits: &cortexvalidation.Limits{},
ResultsCacheConfig: &queryrange.ResultsCacheConfig{},
Limits: &cortexvalidation.Limits{},
},
},
}
Expand Down
68 changes: 41 additions & 27 deletions pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du
}
}

// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order.
func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
// Empty response for label_names, label_values and series API.
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: []string{},
}, nil
}

if len(responses) == 1 {
return responses[0], nil
}

switch responses[0].(type) {
case *ThanosLabelsResponse:
if len(responses) == 1 {
return responses[0], nil
}
set := make(map[string]struct{})

for _, res := range responses {
Expand All @@ -83,25 +84,23 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange
Data: lbls,
}, nil
case *ThanosSeriesResponse:
seriesData := make([]labelpb.LabelSet, 0)
if len(responses) == 1 {
return responses[0], nil
}
seriesData := make(labelpb.ZLabelSets, 0)

// seriesString is used in soring so we don't have to calculate the string of label sets again.
seriesString := make([]string, 0)
uniqueSeries := make(map[string]struct{})
for _, res := range responses {
for _, series := range res.(*ThanosSeriesResponse).Data {
s := labelpb.LabelsToPromLabels(series.Labels).String()
s := series.PromLabels().String()
if _, ok := uniqueSeries[s]; !ok {
seriesData = append(seriesData, series)
seriesString = append(seriesString, s)
uniqueSeries[s] = struct{}{}
}
}
}

sort.Slice(seriesData, func(i, j int) bool {
return seriesString[i] < seriesString[j]
})
sort.Sort(seriesData)
return &ThanosSeriesResponse{
Status: queryrange.StatusSuccess,
Data: seriesData,
Expand Down Expand Up @@ -134,7 +133,8 @@ func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request) (queryran
}

func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
var u *url.URL
var req *http.Request
var err error
switch thanosReq := r.(type) {
case *ThanosLabelsRequest:
var params = url.Values{
Expand All @@ -145,10 +145,29 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),

// If label is not empty, then it is a label values query.
if thanosReq.Label != "" {
u := &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}

req = &http.Request{
Method: http.MethodGet,
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}
} else {
req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
Expand All @@ -163,22 +182,17 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),

req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
}

req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}

return req.WithContext(ctx), nil
}

Expand Down
179 changes: 166 additions & 13 deletions pkg/queryfrontend/labels_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
name: "thanos labels names request",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/labels"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.URL.Path == "/api/v1/labels"
},
},
{
name: "thanos labels values request",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand All @@ -243,7 +243,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
},
{
name: "thanos labels values request, partial response set to true",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__", PartialResponse: true},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand All @@ -255,8 +255,8 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
name: "thanos series request with empty matchers",
req: &ThanosSeriesRequest{Start: 123000, End: 456000, Path: "/api/v1/series"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -269,9 +269,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "cluster", "test")}},
},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
r.URL.Query().Get(queryv1.MatcherParam) == `{cluster="test"}` &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.FormValue(queryv1.MatcherParam) == `{cluster="test"}` &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -284,9 +284,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
Dedup: true,
},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
r.URL.Query().Get(queryv1.DedupParam) == "true" &&
return r.FormValue(start) == startTime &&
r.FormValue(end) == endTime &&
r.FormValue(queryv1.DedupParam) == "true" &&
r.URL.Path == "/api/v1/series"
},
},
Expand All @@ -313,12 +313,29 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
labelsData, err := json.Marshal(labelResponse)
testutil.Ok(t, err)

labelResponseWithHeaders := &ThanosLabelsResponse{
Status: "success",
Data: []string{"__name__"},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
labelsDataWithHeaders, err := json.Marshal(labelResponseWithHeaders)
testutil.Ok(t, err)

seriesResponse := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "foo", Value: "bar"}}}},
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
}
seriesData, err := json.Marshal(seriesResponse)
testutil.Ok(t, err)

seriesResponseWithHeaders := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
seriesDataWithHeaders, err := json.Marshal(seriesResponseWithHeaders)
testutil.Ok(t, err)

for _, tc := range []struct {
name string
expectedError error
Expand All @@ -344,12 +361,34 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))},
expectedResponse: labelResponse,
},
{
name: "thanos labels request with HTTP headers",
req: &ThanosLabelsRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: labelResponseWithHeaders,
},
{
name: "thanos series request",
req: &ThanosSeriesRequest{},
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))},
expectedResponse: seriesResponse,
},
{
name: "thanos series request with HTTP headers",
req: &ThanosSeriesRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: seriesResponseWithHeaders,
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
Expand All @@ -364,3 +403,117 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
})
}
}

func TestLabelsCodec_MergeResponse(t *testing.T) {
for _, tc := range []struct {
name string
expectedError error
responses []queryrange.Response
expectedResponse queryrange.Response
}{
{
name: "Prometheus range query response format, not valid",
responses: []queryrange.Response{
&queryrange.PrometheusResponse{Status: "success"},
},
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format"),
},
{
name: "Empty response",
responses: nil,
expectedResponse: &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
{
name: "One label response",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "One label response and two empty responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "Multiple duplicate label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9092", "localhost:9093"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
// This case shouldn't happen because the responses from Querier are sorted.
{
name: "Multiple unordered label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9093", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9090"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
{
name: "One series response",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "One series response and two empty responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple duplicate series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple unordered series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
codec := NewThanosLabelsCodec(false, time.Hour*2)
r, err := codec.MergeResponse(tc.responses...)
if tc.expectedError != nil {
testutil.Equals(t, err, tc.expectedError)
} else {
testutil.Ok(t, err)
testutil.Equals(t, tc.expectedResponse, r)
}
})
}
}
Loading

0 comments on commit b30dbed

Please sign in to comment.