diff --git a/CHANGELOG.md b/CHANGELOG.md index 35ae806629..c4e9b53033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +### Added + +- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs. + ### Changed - [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 6cf2b80496..dbd76e067b 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -52,6 +52,14 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +const ( + DedupParam = "dedup" + PartialResponseParam = "partial_response" + MaxSourceResolutionParam = "max_source_resolution" + ReplicaLabelsParam = "replicaLabels[]" + StoreMatcherParam = "storeMatch[]" +) + // QueryAPI is an API used by Thanos Query. type QueryAPI struct { baseAPI *api.BaseAPI @@ -139,41 +147,38 @@ type queryData struct { } func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) { - const dedupParam = "dedup" enableDeduplication = true - if val := r.FormValue(dedupParam); val != "" { + if val := r.FormValue(DedupParam); val != "" { var err error enableDeduplication, err = strconv.ParseBool(val) if err != nil { - return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", dedupParam)} + return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", DedupParam)} } } return enableDeduplication, nil } func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) { - const replicaLabelsParam = "replicaLabels[]" if err := r.ParseForm(); err != nil { return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} } replicaLabels = qapi.replicaLabels // Overwrite the cli flag when provided as a query parameter. - if len(r.Form[replicaLabelsParam]) > 0 { - replicaLabels = r.Form[replicaLabelsParam] + if len(r.Form[ReplicaLabelsParam]) > 0 { + replicaLabels = r.Form[ReplicaLabelsParam] } return replicaLabels, nil } func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) { - const storeMatcherParam = "storeMatch[]" if err := r.ParseForm(); err != nil { return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} } - for _, s := range r.Form[storeMatcherParam] { + for _, s := range r.Form[StoreMatcherParam] { matchers, err := parser.ParseMetricSelector(s) if err != nil { return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} @@ -189,10 +194,9 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [] } func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) { - const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution := 0 * time.Second - val := r.FormValue(maxSourceResolutionParam) + val := r.FormValue(MaxSourceResolutionParam) if qapi.enableAutodownsampling || (val == "auto") { maxSourceResolution = defaultVal } @@ -200,26 +204,24 @@ func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal t var err error maxSourceResolution, err = parseDuration(val) if err != nil { - return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)} + return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", MaxSourceResolutionParam)} } } if maxSourceResolution < 0 { - return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)} + return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", MaxSourceResolutionParam)} } return int64(maxSourceResolution / time.Millisecond), nil } func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) { - const partialResponseParam = "partial_response" - // Overwrite the cli flag when provided as a query parameter. - if val := r.FormValue(partialResponseParam); val != "" { + if val := r.FormValue(PartialResponseParam); val != "" { var err error defaultEnablePartialResponse, err = strconv.ParseBool(val) if err != nil { - return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", partialResponseParam)} + return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", PartialResponseParam)} } } return defaultEnablePartialResponse, nil @@ -439,7 +441,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A return nil, nil, apiErr } - q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false). + storeMatchers, apiErr := qapi.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false). Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index bec886d622..6c300e0197 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1056,7 +1056,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) { gate: gate.NewKeeper(nil).NewGate(4), } v := url.Values{} - v.Set("max_source_resolution", test.maxSourceResolutionParam) + v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam) r := http.Request{PostForm: v} // If no max_source_resolution is specified fit at least 5 samples between steps. @@ -1070,6 +1070,54 @@ func TestParseDownsamplingParamMillis(t *testing.T) { } } +func TestParseStoreMatchersParam(t *testing.T) { + for i, tc := range []struct { + storeMatchers string + fail bool + result [][]storepb.LabelMatcher + }{ + { + storeMatchers: "123", + fail: true, + }, + { + storeMatchers: "foo", + fail: false, + result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}}, + }, + { + storeMatchers: `{__address__="localhost:10905"}`, + fail: false, + result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}}, + }, + { + storeMatchers: `{__address__="localhost:10905", cluster="test"}`, + fail: false, + result: [][]storepb.LabelMatcher{{ + storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}, + storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"}, + }}, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + api := QueryAPI{ + gate: gate.NewKeeper(nil).NewGate(4), + } + v := url.Values{} + v.Set(StoreMatcherParam, tc.storeMatchers) + r := &http.Request{PostForm: v} + + storeMatchers, err := api.parseStoreMatchersParam(r) + if !tc.fail { + testutil.Equals(t, tc.result, storeMatchers) + testutil.Equals(t, (*baseAPI.ApiError)(nil), err) + } else { + testutil.NotOk(t, err) + } + }) + } +} + type mockedRulesClient struct { g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup w storage.Warnings diff --git a/pkg/query/querier.go b/pkg/query/querier.go index d766973649..8d8765471b 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . aggrs := aggrsFromFunc(hints.Func) - // TODO: Pass it using the SerieRequest instead of relying on context + // TODO: Pass it using the SeriesRequest instead of relying on context ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) resp := &seriesServer{ctx: ctx} @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() + // TODO: Pass it using the SeriesRequest instead of relying on context + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) + resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ Label: name, PartialResponseDisabled: !q.partialResponse, @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") defer span.Finish() + // TODO: Pass it using the SeriesRequest instead of relying on context + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) + resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{ PartialResponseDisabled: !q.partialResponse, Start: q.mint, diff --git a/pkg/queryfrontend/codec.go b/pkg/queryfrontend/codec.go index f3f1d20018..769ce0b1bc 100644 --- a/pkg/queryfrontend/codec.go +++ b/pkg/queryfrontend/codec.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + queryv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -83,7 +84,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req return nil, errStepTooSmall } - result.Dedup, err = parseEnableDedupParam(r.FormValue("dedup")) + result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam)) if err != nil { return nil, err } @@ -92,22 +93,22 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req result.AutoDownsampling = true result.MaxSourceResolution = result.Step / 5 } else { - result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution")) + result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue(queryv1.MaxSourceResolutionParam)) if err != nil { return nil, err } } - result.PartialResponse, err = parsePartialResponseParam(r.FormValue("partial_response"), c.partialResponse) + result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse) if err != nil { return nil, err } - if len(r.Form["replicaLabels[]"]) > 0 { - result.ReplicaLabels = r.Form["replicaLabels[]"] + if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 { + result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam] } - result.StoreMatchers, err = parseStoreMatchersParam(r.Form["storeMatch[]"]) + result.StoreMatchers, err = parseStoreMatchersParam(r.Form[queryv1.StoreMatcherParam]) if err != nil { return nil, err } @@ -131,21 +132,21 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") } params := url.Values{ - "start": []string{encodeTime(thanosReq.Start)}, - "end": []string{encodeTime(thanosReq.End)}, - "step": []string{encodeDurationMillis(thanosReq.Step)}, - "query": []string{thanosReq.Query}, - "dedup": []string{strconv.FormatBool(thanosReq.Dedup)}, - "partial_response": []string{strconv.FormatBool(thanosReq.PartialResponse)}, - "replicaLabels[]": thanosReq.ReplicaLabels, + "start": []string{encodeTime(thanosReq.Start)}, + "end": []string{encodeTime(thanosReq.End)}, + "step": []string{encodeDurationMillis(thanosReq.Step)}, + "query": []string{thanosReq.Query}, + queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)}, + queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)}, + queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels, } if thanosReq.AutoDownsampling { - params["max_source_resolution"] = []string{"auto"} + params[queryv1.MaxSourceResolutionParam] = []string{"auto"} } else if thanosReq.MaxSourceResolution != 0 { // Add this param only if it is set. Set to 0 will impact // auto-downsampling in the querier. - params["max_source_resolution"] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)} + params[queryv1.MaxSourceResolutionParam] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)} } if len(thanosReq.StoreMatchers) > 0 { @@ -153,7 +154,7 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") } - params["storeMatch[]"] = storeMatchers + params[queryv1.StoreMatcherParam] = storeMatchers } u := &url.URL{ @@ -191,7 +192,7 @@ func parseEnableDedupParam(s string) (bool, error) { var err error enableDeduplication, err = strconv.ParseBool(s) if err != nil { - return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "dedup") + return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.DedupParam) } } @@ -204,7 +205,7 @@ func parseDownsamplingParamMillis(s string) (int64, error) { var err error maxSourceResolution, err = parseDurationMillis(s) if err != nil { - return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "max_source_resolution") + return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.MaxSourceResolutionParam) } } @@ -220,7 +221,7 @@ func parsePartialResponseParam(s string, defaultEnablePartialResponse bool) (boo var err error defaultEnablePartialResponse, err = strconv.ParseBool(s) if err != nil { - return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "partial_response") + return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.PartialResponseParam) } } @@ -232,11 +233,11 @@ func parseStoreMatchersParam(ss []string) ([][]storepb.LabelMatcher, error) { for _, s := range ss { matchers, err := parser.ParseMetricSelector(s) if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "storeMatch[]") + return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.StoreMatcherParam) } stm, err := storepb.TranslatePromMatchers(matchers...) if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "storeMatch[]") + return nil, httpgrpc.Errorf(http.StatusBadRequest, queryv1.StoreMatcherParam) } storeMatchers = append(storeMatchers, stm) } diff --git a/pkg/queryfrontend/codec_test.go b/pkg/queryfrontend/codec_test.go index cb32ee6fc1..4797c6b8b7 100644 --- a/pkg/queryfrontend/codec_test.go +++ b/pkg/queryfrontend/codec_test.go @@ -10,10 +10,12 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/weaveworks/common/httpgrpc" + + queryv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" - "github.com/weaveworks/common/httpgrpc" ) func TestCodec_DecodeRequest(t *testing.T) { @@ -218,7 +220,7 @@ func TestCodec_EncodeRequest(t *testing.T) { return r.URL.Query().Get("start") == "123" && r.URL.Query().Get("end") == "456" && r.URL.Query().Get("step") == "1" && - r.URL.Query().Get("dedup") == "true" + r.URL.Query().Get(queryv1.DedupParam) == "true" }, }, { @@ -233,7 +235,7 @@ func TestCodec_EncodeRequest(t *testing.T) { return r.URL.Query().Get("start") == "123" && r.URL.Query().Get("end") == "456" && r.URL.Query().Get("step") == "1" && - r.URL.Query().Get("partial_response") == "true" + r.URL.Query().Get(queryv1.PartialResponseParam) == "true" }, }, { @@ -248,7 +250,7 @@ func TestCodec_EncodeRequest(t *testing.T) { return r.URL.Query().Get("start") == "123" && r.URL.Query().Get("end") == "456" && r.URL.Query().Get("step") == "1" && - r.URL.Query().Get("max_source_resolution") == "300" + r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "300" }, }, { @@ -263,7 +265,7 @@ func TestCodec_EncodeRequest(t *testing.T) { return r.URL.Query().Get("start") == "123" && r.URL.Query().Get("end") == "456" && r.URL.Query().Get("step") == "1" && - r.URL.Query().Get("max_source_resolution") == "3600" + r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "3600" }, }, } { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c8ec49a043..43fbee6b61 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -273,7 +273,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st)) continue } - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) // This is used to cancel this stream when one operations takes too long. seriesCtx, closeSeries := context.WithCancel(gctx) @@ -515,7 +515,8 @@ func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatc return false, nil } match, err := storeMatchMetadata(s, storeMatcher) - if err != nil || !match { + // Return result here if no matchers set. + if len(matchers) == 0 || err != nil || !match { return match, err } return labelSetsMatch(s.LabelSets(), matchers) @@ -587,14 +588,32 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques *storepb.LabelNamesResponse, error, ) { var ( - warnings []string - names [][]string - mtx sync.Mutex - g, gctx = errgroup.WithContext(ctx) + warnings []string + names [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + storeDebugMsgs []string ) for _, st := range s.stores() { st := st + var ok bool + tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) { + storeMatcher := [][]storepb.LabelMatcher{} + if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil { + if value, ok := ctxVal.([][]storepb.LabelMatcher); ok { + storeMatcher = value + } + } + // We can skip error, we already translated matchers once. + ok, _ = storeMatches(st, r.Start, r.End, storeMatcher) + }) + if !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st)) + continue + } + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + g.Go(func() error { resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ PartialResponseDisabled: r.PartialResponseDisabled, @@ -626,6 +645,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques return nil, err } + level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) return &storepb.LabelNamesResponse{ Names: strutil.MergeUnsortedSlices(names...), Warnings: warnings, @@ -637,14 +657,32 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ *storepb.LabelValuesResponse, error, ) { var ( - warnings []string - all [][]string - mtx sync.Mutex - g, gctx = errgroup.WithContext(ctx) + warnings []string + all [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + storeDebugMsgs []string ) for _, st := range s.stores() { store := st + var ok bool + tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) { + storeMatcher := [][]storepb.LabelMatcher{} + if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil { + if value, ok := ctxVal.([][]storepb.LabelMatcher); ok { + storeMatcher = value + } + } + // We can skip error, we already translated matchers once. + ok, _ = storeMatches(st, r.Start, r.End, storeMatcher) + }) + if !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st)) + continue + } + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ Label: r.Label, @@ -677,6 +715,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ return nil, err } + level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) return &storepb.LabelValuesResponse{ Values: strutil.MergeUnsortedSlices(all...), Warnings: warnings, diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 005bd7d4e5..a8f6b90086 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1087,6 +1087,13 @@ func TestProxyStore_LabelValues(t *testing.T) { Values: []string{"3", "4"}, }, }}, + &testClient{StoreClient: &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"5", "6"}, + }}, + minTime: timestamp.FromTime(time.Now().Add(-1 * time.Minute)), + maxTime: timestamp.FromTime(time.Now()), + }, } q := NewProxyStore(nil, nil, @@ -1107,6 +1114,20 @@ func TestProxyStore_LabelValues(t *testing.T) { testutil.Ok(t, err) testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq) + testutil.Equals(t, []string{"1", "2", "3", "4", "5", "6"}, resp.Values) + testutil.Equals(t, 1, len(resp.Warnings)) + + // Request outside the time range of the last store client. + req = &storepb.LabelValuesRequest{ + Label: "a", + PartialResponseDisabled: true, + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(time.Now().Add(-1 * time.Hour)), + } + resp, err = q.LabelValues(ctx, req) + testutil.Ok(t, err) + testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq) + testutil.Equals(t, []string{"1", "2", "3", "4"}, resp.Values) testutil.Equals(t, 1, len(resp.Warnings)) } @@ -1118,7 +1139,8 @@ func TestProxyStore_LabelNames(t *testing.T) { title string storeAPIs []Client - req *storepb.LabelNamesRequest + req *storepb.LabelNamesRequest + storeMatchers [][]storepb.LabelMatcher expectedNames []string expectedErr error @@ -1197,6 +1219,56 @@ func TestProxyStore_LabelNames(t *testing.T) { expectedNames: []string{"a", "b"}, expectedWarningsLen: 1, }, + { + title: "stores filtered by time range", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + minTime: timestamp.FromTime(time.Now().Add(-4 * time.Hour)), + maxTime: timestamp.FromTime(time.Now().Add(-3 * time.Hour)), + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"c", "d"}, + }, + }, + minTime: timestamp.FromTime(time.Now().Add(-2 * time.Hour)), + maxTime: timestamp.FromTime(time.Now().Add(-1 * time.Hour)), + }, + }, + req: &storepb.LabelNamesRequest{ + Start: timestamp.FromTime(time.Now().Add(-1 * time.Minute)), + End: timestamp.FromTime(time.Now()), + PartialResponseDisabled: false, + }, + expectedNames: nil, + expectedWarningsLen: 0, + }, + { + title: "store matchers specified", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + }, + req: &storepb.LabelNamesRequest{ + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(maxTime), + PartialResponseDisabled: false, + }, + storeMatchers: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "foo"}}}, + expectedNames: nil, + expectedWarningsLen: 0, + }, } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore( @@ -1209,6 +1281,9 @@ func TestProxyStore_LabelNames(t *testing.T) { ) ctx := context.Background() + if len(tc.storeMatchers) > 0 { + ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeMatchers) + } resp, err := q.LabelNames(ctx, tc.req) if tc.expectedErr != nil { testutil.NotOk(t, err)