Skip to content

Commit

Permalink
querier: Support store matchers and time range filter on labels API (#…
Browse files Browse the repository at this point in the history
…3133)

* support store matchers on labels API

Signed-off-by: Ben Ye <yb532204897@gmail.com>

Add more unit tests in proxy store

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* Add changelog

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* address review comments

Signed-off-by: Ben Ye <yb532204897@gmail.com>

Co-authored-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
yeya24 and GiedriusS authored Sep 10, 2020
1 parent 6c2e772 commit e0b7f7b
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 56 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Added

- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs.

### Changed

- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`.
Expand Down
41 changes: 24 additions & 17 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

const (
DedupParam = "dedup"
PartialResponseParam = "partial_response"
MaxSourceResolutionParam = "max_source_resolution"
ReplicaLabelsParam = "replicaLabels[]"
StoreMatcherParam = "storeMatch[]"
)

// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
Expand Down Expand Up @@ -139,41 +147,38 @@ type queryData struct {
}

func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
const dedupParam = "dedup"
enableDeduplication = true

if val := r.FormValue(dedupParam); val != "" {
if val := r.FormValue(DedupParam); val != "" {
var err error
enableDeduplication, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", dedupParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", DedupParam)}
}
}
return enableDeduplication, nil
}

func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

replicaLabels = qapi.replicaLabels
// Overwrite the cli flag when provided as a query parameter.
if len(r.Form[replicaLabelsParam]) > 0 {
replicaLabels = r.Form[replicaLabelsParam]
if len(r.Form[ReplicaLabelsParam]) > 0 {
replicaLabels = r.Form[ReplicaLabelsParam]
}

return replicaLabels, nil
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

for _, s := range r.Form[storeMatcherParam] {
for _, s := range r.Form[StoreMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -189,37 +194,34 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second

val := r.FormValue(maxSourceResolutionParam)
val := r.FormValue(MaxSourceResolutionParam)
if qapi.enableAutodownsampling || (val == "auto") {
maxSourceResolution = defaultVal
}
if val != "" && val != "auto" {
var err error
maxSourceResolution, err = parseDuration(val)
if err != nil {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", MaxSourceResolutionParam)}
}
}

if maxSourceResolution < 0 {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", MaxSourceResolutionParam)}
}

return int64(maxSourceResolution / time.Millisecond), nil
}

func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) {
const partialResponseParam = "partial_response"

// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
if val := r.FormValue(PartialResponseParam); val != "" {
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", partialResponseParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", PartialResponseParam)}
}
}
return defaultEnablePartialResponse, nil
Expand Down Expand Up @@ -439,7 +441,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
50 changes: 49 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set("max_source_resolution", test.maxSourceResolutionParam)
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand All @@ -1070,6 +1070,54 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
}
}

func TestParseStoreMatchersParam(t *testing.T) {
for i, tc := range []struct {
storeMatchers string
fail bool
result [][]storepb.LabelMatcher
}{
{
storeMatchers: "123",
fail: true,
},
{
storeMatchers: "foo",
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}},
},
{
storeMatchers: `{__address__="localhost:10905"}`,
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}},
},
{
storeMatchers: `{__address__="localhost:10905", cluster="test"}`,
fail: false,
result: [][]storepb.LabelMatcher{{
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"},
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"},
}},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}

storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Equals(t, tc.result, storeMatchers)
testutil.Equals(t, (*baseAPI.ApiError)(nil), err)
} else {
testutil.NotOk(t, err)
}
})
}
}

type mockedRulesClient struct {
g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup
w storage.Warnings
Expand Down
8 changes: 7 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
Expand Down Expand Up @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Expand All @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
Expand Down
43 changes: 22 additions & 21 deletions pkg/queryfrontend/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
return nil, errStepTooSmall
}

result.Dedup, err = parseEnableDedupParam(r.FormValue("dedup"))
result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam))
if err != nil {
return nil, err
}
Expand All @@ -92,22 +93,22 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
result.AutoDownsampling = true
result.MaxSourceResolution = result.Step / 5
} else {
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution"))
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue(queryv1.MaxSourceResolutionParam))
if err != nil {
return nil, err
}
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue("partial_response"), c.partialResponse)
result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

if len(r.Form["replicaLabels[]"]) > 0 {
result.ReplicaLabels = r.Form["replicaLabels[]"]
if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 {
result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam]
}

result.StoreMatchers, err = parseStoreMatchersParam(r.Form["storeMatch[]"])
result.StoreMatchers, err = parseStoreMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}
Expand All @@ -131,29 +132,29 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params := url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
"dedup": []string{strconv.FormatBool(thanosReq.Dedup)},
"partial_response": []string{strconv.FormatBool(thanosReq.PartialResponse)},
"replicaLabels[]": thanosReq.ReplicaLabels,
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels,
}

if thanosReq.AutoDownsampling {
params["max_source_resolution"] = []string{"auto"}
params[queryv1.MaxSourceResolutionParam] = []string{"auto"}
} else if thanosReq.MaxSourceResolution != 0 {
// Add this param only if it is set. Set to 0 will impact
// auto-downsampling in the querier.
params["max_source_resolution"] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
params[queryv1.MaxSourceResolutionParam] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
}

if len(thanosReq.StoreMatchers) > 0 {
storeMatchers, err := matchersToStringSlice(thanosReq.StoreMatchers)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params["storeMatch[]"] = storeMatchers
params[queryv1.StoreMatcherParam] = storeMatchers
}

u := &url.URL{
Expand Down Expand Up @@ -191,7 +192,7 @@ func parseEnableDedupParam(s string) (bool, error) {
var err error
enableDeduplication, err = strconv.ParseBool(s)
if err != nil {
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "dedup")
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.DedupParam)
}
}

Expand All @@ -204,7 +205,7 @@ func parseDownsamplingParamMillis(s string) (int64, error) {
var err error
maxSourceResolution, err = parseDurationMillis(s)
if err != nil {
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "max_source_resolution")
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.MaxSourceResolutionParam)
}
}

Expand All @@ -220,7 +221,7 @@ func parsePartialResponseParam(s string, defaultEnablePartialResponse bool) (boo
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(s)
if err != nil {
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "partial_response")
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.PartialResponseParam)
}
}

Expand All @@ -232,11 +233,11 @@ func parseStoreMatchersParam(ss []string) ([][]storepb.LabelMatcher, error) {
for _, s := range ss {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.StoreMatcherParam)
}
stm, err := storepb.TranslatePromMatchers(matchers...)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, queryv1.StoreMatcherParam)
}
storeMatchers = append(storeMatchers, stm)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/queryfrontend/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/weaveworks/common/httpgrpc"
)

func TestCodec_DecodeRequest(t *testing.T) {
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("dedup") == "true"
r.URL.Query().Get(queryv1.DedupParam) == "true"
},
},
{
Expand All @@ -233,7 +235,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("partial_response") == "true"
r.URL.Query().Get(queryv1.PartialResponseParam) == "true"
},
},
{
Expand All @@ -248,7 +250,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "300"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "300"
},
},
{
Expand All @@ -263,7 +265,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "3600"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "3600"
},
},
} {
Expand Down
Loading

0 comments on commit e0b7f7b

Please sign in to comment.