Skip to content

Commit

Permalink
Allow passing allow-list of store addrs for debug
Browse files Browse the repository at this point in the history
Add a new parameter on the API to pass an allow list of stores.
Only stores matching this list will be able to be used for the query.

Signed-off-by: Geoffrey Beausire <g.beausire@criteo.com>
  • Loading branch information
geobeau committed Jul 30, 2020
1 parent fa60bf6 commit dba30d7
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page.
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
- [#2926](https://github.com/thanos-io/thanos/pull/2926) API: Add new blocks HTTP API to serve blocks metadata. The status endpoints (`/api/v1/status/flags`, `/api/v1/status/runtimeinfo` and `/api/v1/status/buildinfo`) are now available on all components with a HTTP API.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.
Expand All @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`.
- [#2915](https://github.com/thanos-io/thanos/pull/2915) Receive,Ruler: Enable TSDB directory locking by default. Add a new flag (`--tsdb.no-lockfile`) to override behavior.
- [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel.
- [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. See [documentation](https://thanos.io/components/query.md/#store-filtering)

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
20 changes: 20 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ Thanos Querier has the ability to perform concurrent select request per query. I
The maximum number of concurrent requests are being made per query is controller by `query.max-concurrent-select` flag.
Keep in mind that the maximum number of concurrent queries that are handled by querier is controlled by `query.max-concurrent`. Please consider implications of combined value while tuning the querier.

### Store filtering

It's possible to provide a set of matchers to the Querier api to select specific stores to be used during the query using the `storeMatch[]` parameter. It is useful when debugging a slow/broken store.
It uses the same format as the matcher of [Prometheus' federate api](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers).
Note that at the moment the querier only supports the `__address__` which contain the address of the store as it is shown on the `/stores` endoint of the UI.

Example:
```
- targets:
- prometheus-foo.thanos-sidecar:10901
- prometheus-bar.thanos-sidecar:10901
```

```
http://localhost:10901/api/v1/query?query=up&dedup=true&partial_response=true&storeMatch={__address__=~"prometheus-foo.*"}
```

Will only return metrics from `prometheus-foo.thanos-sidecar:10901`


## Expose UI on a sub-path

It is possible to expose thanos-query UI and optionally API on a sub-path.
Expand Down
51 changes: 46 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []
return replicaLabels, nil
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

for _, s := range r.Form[storeMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
stm, err := storepb.TranslatePromMatchers(matchers...)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrap(err, "convert store matchers")}
}
storeMatchers = append(storeMatchers, stm)
}

return storeMatchers, nil
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -236,6 +257,11 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -250,7 +276,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -335,6 +361,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// If no max_source_resolution is specified fit at least 5 samples between steps.
maxSourceResolution, apiErr := qapi.parseDownsamplingParamMillis(r, step/5)
if apiErr != nil {
Expand All @@ -351,7 +382,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qapi.queryEngine.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -399,7 +430,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -474,12 +505,17 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, math.MaxInt64, enablePartialResponse, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, math.MaxInt64, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -537,7 +573,12 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand All @@ -27,17 +28,18 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
keeper := gate.NewKeeper(reg)

return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
Expand All @@ -54,6 +56,7 @@ type queryable struct {
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
storeMatchers [][]storepb.LabelMatcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
Expand All @@ -66,7 +69,7 @@ type queryable struct {

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
}

type querier struct {
Expand All @@ -76,6 +79,7 @@ type querier struct {
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
storeMatchers [][]storepb.LabelMatcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
Expand All @@ -93,6 +97,7 @@ func newQuerier(
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
storeMatchers [][]storepb.LabelMatcher,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
Expand Down Expand Up @@ -120,6 +125,7 @@ func newQuerier(
mint: mint,
maxt: maxt,
replicaLabels: rl,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
Expand Down Expand Up @@ -253,6 +259,9 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, oneHourMillis, false, false)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
}

timeout := 10 * time.Second
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, 9999999, false, false)
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false)
engine := promql.NewEngine(
promql.EngineOpts{
MaxSamples: math.MaxInt32,
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestQuerier_Select(t *testing.T) {
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {
g := gate.New(2)
q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
41 changes: 39 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"google.golang.org/grpc/status"
)

type ctxKey int

// StoreMatcherKey is the context key for the store's allow list.
const StoreMatcherKey = ctxKey(0)

// Client holds meta information about a store.
type Client interface {
// Client to access the store.
Expand Down Expand Up @@ -255,8 +260,14 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
// NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error.
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := srv.Context().Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...)
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
Expand Down Expand Up @@ -498,14 +509,40 @@ func (s *streamSeriesSet) Err() error {

// matchStore returns true if the given store may hold data for the given label
// matchers.
func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) (bool, error) {
func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatcher, matchers ...storepb.LabelMatcher) (bool, error) {
storeMinTime, storeMaxTime := s.TimeRange()
if mint > storeMaxTime || maxt < storeMinTime {
return false, nil
}
match, err := storeMatchMetadata(s, storeMatcher)
if err != nil || !match {
return match, err
}
return labelSetsMatch(s.LabelSets(), matchers)
}

// storeMatch return true if the store's metadata labels match the storeMatcher.
func storeMatchMetadata(s Client, storeMatcher [][]storepb.LabelMatcher) (bool, error) {
clientLabels := generateMetadataClientLabels(s)
if len(storeMatcher) == 0 {
return true, nil
}
res := false
for _, stm := range storeMatcher {
stmMatch, err := labelSetMatches(clientLabels, stm)
if err != nil {
return false, err
}
res = res || stmMatch
}
return res, nil
}

func generateMetadataClientLabels(s Client) storepb.LabelSet {
l := storepb.Label{Name: "__address__", Value: s.Addr()}
return storepb.LabelSet{Labels: []storepb.Label{l}}
}

// labelSetsMatch returns false if all label-set do not match the matchers.
func labelSetsMatch(lss []storepb.LabelSet, matchers []storepb.LabelMatcher) (bool, error) {
if len(lss) == 0 {
Expand Down
Loading

0 comments on commit dba30d7

Please sign in to comment.