Skip to content

Commit

Permalink
Add support for TSDB selector in querier (thanos-io#7200)
Browse files Browse the repository at this point in the history
* Add support for TSDB selector in querier

This PR allows using the query distributed mode against a set of multi-tenant receivers
as described in https://github.com/thanos-io/thanos/blob/main/docs/proposals-done/202301-distributed-query-execution.md#distributed-execution-against-receive-components.

The feature is enabled by a selector.relabel-config flag in the Query component
which allows it to select a subset of TSDBs to query based on their external labels.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add CHANGELOG entry and fix docs

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix tests

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add comments

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add test case for MatchersForLabelSets

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix failing test

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Use an unbuffered channel

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Change flag description

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Remove parameter from ServerAsClient

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored and jnyi committed Apr 4, 2024
1 parent ac0ca8f commit 972f1f8
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.
- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus configuration directories
- [#7200](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--selector.relabel-config` and `--selector.relabel-config-file` flags which allows scoping the Querier to a subset of matched TSDBs.

### Changed

Expand Down
27 changes: 24 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"google.golang.org/grpc"

"github.com/go-kit/log"
Expand All @@ -32,6 +33,7 @@ import (

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -208,6 +210,14 @@ func registerQuery(app *extkingpin.App) {
Default("1s"))

storeResponseTimeout := extkingpin.ModelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

storeSelectorRelabelConf := *extflag.RegisterPathOrContent(
cmd,
"selector.relabel-config",
"YAML with relabeling configuration that allows the Querier to select specific TSDBs by their external label. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config ",
extflag.WithEnvSubstitution(),
)

reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String()
Expand Down Expand Up @@ -274,6 +284,15 @@ func registerQuery(app *extkingpin.App) {
level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.")
}

tsdbRelabelConfig, err := storeSelectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "error while parsing tsdb selector configuration")
}
tsdbSelector, err := block.ParseRelabelConfig(tsdbRelabelConfig, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}

return runQuery(
g,
logger,
Expand Down Expand Up @@ -343,6 +362,7 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
*extendedFunctionsEnabled,
store.NewTSDBSelector(tsdbSelector),
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
Expand Down Expand Up @@ -424,6 +444,7 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
extendedFunctionsEnabled bool,
tsdbSelector *store.TSDBSelector,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
Expand Down Expand Up @@ -501,9 +522,9 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
}

var (
Expand Down
5 changes: 2 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
}

proxy := store.NewProxyStore(
Expand Down
15 changes: 15 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,21 @@ Flags:
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (mutually exclusive). Content of YAML
with relabeling configuration that allows
the Querier to select specific TSDBs by their
external label. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config-file=<file-path>
Path to YAML with relabeling configuration
that allows the Querier to select
specific TSDBs by their external label.
It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--store=<store> ... Deprecation Warning - This flag is deprecated
and replaced with `endpoint`. Addresses of
statically configured store API servers
Expand Down
28 changes: 23 additions & 5 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ProxyStore struct {
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
}

type proxyStoreMetrics struct {
Expand All @@ -111,10 +112,17 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
// BucketStoreOption are functions that configure BucketStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging enables debug logging.
func WithProxyStoreDebugLogging() ProxyStoreOption {
// WithProxyStoreDebugLogging toggles debug logging.
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
return func(s *ProxyStore) {
s.debugLogging = true
s.debugLogging = enable
}
}

// WithTSDBSelector sets the TSDB selector for the proxy.
func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
return func(s *ProxyStore) {
s.tsdbSelector = selector
}
}

Expand Down Expand Up @@ -147,6 +155,7 @@ func NewProxyStore(
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
}

for _, option := range options {
Expand Down Expand Up @@ -316,7 +325,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant)

stores := []Client{}
var (
stores []Client
storeLabelSets []labels.Labels
)
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
Expand All @@ -326,13 +338,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
continue
}

matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
continue
}
storeLabelSets = append(storeLabelSets, extraMatchers...)

stores = append(stores, st)
}

if len(stores) == 0 {
level.Debug(reqLogger).Log("err", ErrorNoStoresMatched, "stores", strings.Join(storeDebugMsgs, ";"))
return nil
}
r.Matchers = append(r.Matchers, MatchersForLabelSets(storeLabelSets)...)

storeResponses := make([]respSet, 0, len(stores))

Expand Down
131 changes: 128 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
Expand All @@ -30,6 +31,7 @@ import (

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestProxyStore_Info(t *testing.T) {
nil,
func() []Client { return nil },
component.Query,
labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval),
labels.EmptyLabels(), 0*time.Second, EagerRetrieval,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries []rawSeries
expectedErr error
expectedWarningsLen int
relabelConfig string
}{
{
title: "no storeAPI available",
Expand Down Expand Up @@ -622,6 +625,123 @@ func TestProxyStore_Series(t *testing.T) {
},
},
},
{
title: "relabel config with flat store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
},
{
title: "relabel config with nested store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2"), labels.FromStrings("ext", "2")},
StoreClient: storepb.ServerAsClient(NewProxyStore(log.NewNopLogger(), prometheus.NewRegistry(), func() []Client {
return []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
}
}, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval)),
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "3")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "3"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
{
lset: labels.FromStrings("zone", "3"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
for _, replicaLabelSupport := range []bool{false, true} {
Expand All @@ -632,12 +752,15 @@ func TestProxyStore_Series(t *testing.T) {
}
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
t.Run(string(strategy), func(t *testing.T) {
relabelConfig, err := block.ParseRelabelConfig([]byte(tc.relabelConfig), block.SelectorSupportedRelabelActions)
testutil.Ok(t, err)
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
5*time.Second, strategy,
WithTSDBSelector(NewTSDBSelector(relabelConfig)),
)

ctx := context.Background()
Expand All @@ -646,7 +769,7 @@ func TestProxyStore_Series(t *testing.T) {
}

s := newStoreSeriesServer(ctx)
err := q.Series(tc.req, s)
err = q.Series(tc.req, s)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
Expand Down Expand Up @@ -1565,7 +1688,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n want: %v \n got: %v", expected, got)

ret := make([]rawSeries, len(got))
for i, s := range got {
Expand Down Expand Up @@ -1900,6 +2023,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
metrics: newProxyStoreMetrics(nil),
responseTimeout: 5 * time.Second,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

var allResps []*storepb.SeriesResponse
Expand Down Expand Up @@ -2028,6 +2152,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
metrics: newProxyStoreMetrics(nil),
responseTimeout: 0,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

t.Run("failling send", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 972f1f8

Please sign in to comment.