Skip to content

querier: option to skip sending queries to long term storage #1893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## master / unreleased

* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
* `--store.min-chunk-age` has been removed
* `--querier.query-store-after` has been added in it's place.
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027
Expand Down
11 changes: 6 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ The `querier_config` configures the Cortex querier.
# Maximum lookback beyond which queries are not sent to ingester. 0 means all
# queries are sent to ingester.
# CLI flag: -querier.query-ingesters-within
[ingestermaxquerylookback: <duration> | default = 0s]
[query_ingesters_within: <duration> | default = 0s]

# The time after which a metric should only be queried from storage and not just
# ingesters. 0 means all queries are sent to store.
# CLI flag: -querier.query-store-after
[query_store_after: <duration> | default = 0s]

# The default evaluation interval or step size for subqueries.
# CLI flag: -querier.default-evaluation-interval
Expand Down Expand Up @@ -1517,10 +1522,6 @@ write_dedupe_cache_config:
# The CLI flags prefix for this block config is: store.index-cache-write
[fifocache: <fifo_cache_config>]

# Minimum time between chunk update and being saved to the store.
# CLI flag: -store.min-chunk-age
[min_chunk_age: <duration> | default = 0s]

# Cache index entries older than this period. 0 to disable.
# CLI flag: -store.cache-lookups-older-than
[cache_lookups_older_than: <duration> | default = 0s]
Expand Down
7 changes: 0 additions & 7 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type StoreConfig struct {
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"`
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"`

MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"`
CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"`

// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
Expand All @@ -61,7 +60,6 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.chunkCacheStubs, "store.chunk-cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)

f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.")
f.DurationVar(&cfg.MaxLookBackPeriod, "store.max-look-back-period", 0, "Limit how long back data can be queried")

Expand Down Expand Up @@ -269,11 +267,6 @@ func (c *store) validateQueryTimeRange(ctx context.Context, userID string, from
return true, nil
}

if from.After(now.Add(-c.cfg.MinChunkAge)) {
// no data relevant to this query will have arrived at the store yet
return true, nil
}

if c.cfg.MaxLookBackPeriod != 0 {
oldestStartTime := model.Now().Add(-c.cfg.MaxLookBackPeriod)
if oldestStartTime.After(*from) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (c *Config) Validate() error {
if err := c.Distributor.Validate(); err != nil {
return errors.Wrap(err, "invalid distributor config")
}
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
return nil
}

Expand Down
65 changes: 46 additions & 19 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package querier

import (
"context"
"errors"
"flag"
"time"

Expand All @@ -19,13 +20,16 @@ import (

// Config contains the configuration require to create a querier
type Config struct {
MaxConcurrent int
Timeout time.Duration
Iterators bool
BatchIterators bool
IngesterStreaming bool
MaxSamples int
IngesterMaxQueryLookback time.Duration
MaxConcurrent int
Timeout time.Duration
Iterators bool
BatchIterators bool
IngesterStreaming bool
MaxSamples int
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`

// The default evaluation interval for the promql engine.
// Needs to be configured for subqueries to work as it is the default
Expand All @@ -36,6 +40,10 @@ type Config struct {
metricsRegisterer prometheus.Registerer `yaml:"-"`
}

var (
errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
Expand All @@ -47,11 +55,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", false, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.")
f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", false, "Use streaming RPCs to query ingester.")
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.IngesterMaxQueryLookback, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.")
cfg.metricsRegisterer = prometheus.DefaultRegisterer
}

// Validate the config
func (cfg *Config) Validate() error {

// Ensure the config wont create a situation where no queriers are returned.
if cfg.QueryIngestersWithin != 0 && cfg.QueryStoreAfter != 0 {
if cfg.QueryStoreAfter >= cfg.QueryIngestersWithin {
return errBadLookbackConfigs
}
}

return nil
}

// ChunkStore is the read-interface to the Chunk Store. Made an interface here
// to reduce package coupling.
type ChunkStore interface {
Expand All @@ -70,11 +92,11 @@ func New(cfg Config, distributor Distributor, chunkStore ChunkStore) (storage.Qu
var queryable storage.Queryable
if cfg.IngesterStreaming {
dq := newIngesterStreamingQueryable(distributor, iteratorFunc)
queryable = newUnifiedChunkQueryable(dq, chunkStore, distributor, iteratorFunc, cfg.IngesterMaxQueryLookback)
queryable = newUnifiedChunkQueryable(dq, chunkStore, distributor, iteratorFunc, cfg)
} else {
cq := newChunkStoreQueryable(chunkStore, iteratorFunc)
dq := newDistributorQueryable(distributor)
queryable = NewQueryable(dq, cq, distributor, cfg.IngesterMaxQueryLookback)
queryable = NewQueryable(dq, cq, distributor, cfg)
}

lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
Expand All @@ -97,30 +119,35 @@ func New(cfg Config, distributor Distributor, chunkStore ChunkStore) (storage.Qu
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(dq, cq storage.Queryable, distributor Distributor, ingesterMaxQueryLookback time.Duration) storage.Queryable {
func NewQueryable(dq, cq storage.Queryable, distributor Distributor, cfg Config) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
cqr, err := cq.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}

q := querier{
queriers: []storage.Querier{cqr},
distributor: distributor,
ctx: ctx,
mint: mint,
maxt: maxt,
}

// Include ingester only if maxt is within ingesterMaxQueryLookback w.r.t. current time.
if ingesterMaxQueryLookback == 0 || maxt >= time.Now().Add(-ingesterMaxQueryLookback).UnixNano()/1e6 {
// Include ingester only if maxt is within queryIngestersWithin w.r.t. current time.
now := model.Now()
if cfg.QueryIngestersWithin == 0 || maxt >= int64(now.Add(-cfg.QueryIngestersWithin)) {
dqr, err := dq.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
q.queriers = append(q.queriers, dqr)
}

// Include store only if mint is within QueryStoreAfter w.r.t current time.
if cfg.QueryStoreAfter == 0 || mint <= int64(now.Add(-cfg.QueryStoreAfter)) {
cqr, err := cq.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}

q.queriers = append(q.queriers, cqr)
}

return q, nil
})
}
Expand Down
159 changes: 129 additions & 30 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -149,45 +150,45 @@ func TestQuerier(t *testing.T) {

func TestNoHistoricalQueryToIngester(t *testing.T) {
testCases := []struct {
name string
mint, maxt time.Time
hitIngester bool
ingesterMaxQueryLookback time.Duration
name string
mint, maxt time.Time
hitIngester bool
queryIngestersWithin time.Duration
}{
{
name: "hit-test1",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(1 * time.Hour),
hitIngester: true,
ingesterMaxQueryLookback: 1 * time.Hour,
name: "hit-test1",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(1 * time.Hour),
hitIngester: true,
queryIngestersWithin: 1 * time.Hour,
},
{
name: "hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-59 * time.Minute),
hitIngester: true,
ingesterMaxQueryLookback: 1 * time.Hour,
name: "hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-59 * time.Minute),
hitIngester: true,
queryIngestersWithin: 1 * time.Hour,
},
{ // Skipping ingester is disabled.
name: "hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-50 * time.Minute),
hitIngester: true,
ingesterMaxQueryLookback: 0,
name: "hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-50 * time.Minute),
hitIngester: true,
queryIngestersWithin: 0,
},
{
name: "dont-hit-test1",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-100 * time.Minute),
hitIngester: false,
ingesterMaxQueryLookback: 1 * time.Hour,
name: "dont-hit-test1",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-100 * time.Minute),
hitIngester: false,
queryIngestersWithin: 1 * time.Hour,
},
{
name: "dont-hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-61 * time.Minute),
hitIngester: false,
ingesterMaxQueryLookback: 1 * time.Hour,
name: "dont-hit-test2",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-61 * time.Minute),
hitIngester: false,
queryIngestersWithin: 1 * time.Hour,
},
}

Expand All @@ -201,7 +202,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
for _, ingesterStreaming := range []bool{true, false} {
cfg.IngesterStreaming = ingesterStreaming
for _, c := range testCases {
cfg.IngesterMaxQueryLookback = c.ingesterMaxQueryLookback
cfg.QueryIngestersWithin = c.queryIngestersWithin
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e)
distributor := &errDistributor{}
Expand Down Expand Up @@ -300,3 +301,101 @@ func (m *errDistributor) LabelNames(context.Context) ([]string, error) {
func (m *errDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
return nil, errDistributorError
}

type emptyChunkStore struct {
sync.Mutex
called bool
}

func (c *emptyChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
c.Lock()
defer c.Unlock()
c.called = true
return nil, nil
}

func (c *emptyChunkStore) IsCalled() bool {
c.Lock()
defer c.Unlock()
return c.called
}

func TestShortTermQueryToLTS(t *testing.T) {
testCases := []struct {
name string
mint, maxt time.Time
hitIngester bool
hitLTS bool
queryIngestersWithin time.Duration
queryStoreAfter time.Duration
}{
{
name: "hit only ingester",
mint: time.Now().Add(-5 * time.Minute),
maxt: time.Now(),
hitIngester: true,
hitLTS: false,
queryIngestersWithin: 1 * time.Hour,
queryStoreAfter: time.Hour,
},
{
name: "hit both",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now(),
hitIngester: true,
hitLTS: true,
queryIngestersWithin: 1 * time.Hour,
queryStoreAfter: time.Hour,
},
{
name: "hit only storage",
mint: time.Now().Add(-5 * time.Hour),
maxt: time.Now().Add(-2 * time.Hour),
hitIngester: false,
hitLTS: true,
queryIngestersWithin: 1 * time.Hour,
queryStoreAfter: 0,
},
}

engine := promql.NewEngine(promql.EngineOpts{
Logger: util.Logger,
MaxConcurrent: 10,
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
})
cfg := Config{}
for _, ingesterStreaming := range []bool{true, false} {
cfg.IngesterStreaming = ingesterStreaming
for _, c := range testCases {
cfg.QueryIngestersWithin = c.queryIngestersWithin
cfg.QueryStoreAfter = c.queryStoreAfter
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
chunkStore := &emptyChunkStore{}
distributor := &errDistributor{}

queryable, _ := New(cfg, distributor, chunkStore)
query, err := engine.NewRangeQuery(queryable, "dummy", c.mint, c.maxt, 1*time.Minute)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "0")
r := query.Exec(ctx)
_, err = r.Matrix()

if c.hitIngester {
// If the ingester was hit, the distributor always returns errDistributorError.
require.Error(t, err)
require.Equal(t, errDistributorError.Error(), err.Error())
} else {
// If the ingester was hit, there would have been an error from errDistributor.
require.NoError(t, err)
}

// Verify if the test did/did not hit the LTS
time.Sleep(30 * time.Millisecond) // NOTE: Since this is a lazy querier there is a race condition between the response and chunk store being called
require.Equal(t, c.hitLTS, chunkStore.IsCalled())
})
}
}

}
Loading