From 174a2bfce77d61623df46cf6448c6654a78be18e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 2 Feb 2021 13:04:09 +0530 Subject: [PATCH] Enable @ modifier with result cache handling (#3744) * Enable @ modifier with result cache handling Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Add querier.at-modifier-enabled flag Signed-off-by: Ganesh Vernekar * Consider frontend.max-cache-freshness for caching @ modifier Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 4 + docs/configuration/config-file-reference.md | 4 + pkg/cortex/modules.go | 9 +- pkg/querier/querier.go | 3 + pkg/querier/queryrange/results_cache.go | 68 ++++++++++-- pkg/querier/queryrange/results_cache_test.go | 110 +++++++++++++++++-- 7 files changed, 176 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22a9bfefb9..14146fb238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `-.s3.sse.type` - `-.s3.sse.kms-key-id` - `-.s3.sse.kms-encryption-context` +* [FEATURE] Querier: Enable `@ ` modifier in PromQL using the new `-querier.at-modifier-enabled` flag. #3744 * [ENHANCEMENT] Ingester: exposed metric `cortex_ingester_oldest_unshipped_block_timestamp_seconds`, tracking the unix timestamp of the oldest TSDB block not shipped to the storage yet. #3705 * [ENHANCEMENT] Prometheus upgraded. #3739 * Avoid unnecessary `runtime.GC()` during compactions. diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 64d5a27c93..d062b9af1d 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -132,6 +132,10 @@ querier: # CLI flag: -querier.query-store-for-labels-enabled [query_store_for_labels_enabled: | default = false] + # Enable the @ modifier in PromQL. + # CLI flag: -querier.at-modifier-enabled + [at_modifier_enabled: | default = false] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 024593283f..1ad7e06b8b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -800,6 +800,10 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.query-store-for-labels-enabled [query_store_for_labels_enabled: | default = false] +# Enable the @ modifier in PromQL. +# CLI flag: -querier.at-modifier-enabled +[at_modifier_enabled: | default = false] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index d92d18dbc5..aa06abf001 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -487,10 +487,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryrange.PrometheusResponseExtractor{}, t.Cfg.Schema, promql.EngineOpts{ - Logger: util_log.Logger, - Reg: prometheus.DefaultRegisterer, - MaxSamples: t.Cfg.Querier.MaxSamples, - Timeout: t.Cfg.Querier.Timeout, + Logger: util_log.Logger, + Reg: prometheus.DefaultRegisterer, + MaxSamples: t.Cfg.Querier.MaxSamples, + Timeout: t.Cfg.Querier.Timeout, + EnableAtModifier: t.Cfg.Querier.AtModifierEnabled, NoStepSubqueryIntervalFn: func(int64) int64 { return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() }, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ce8d12ee21..44a0a263c4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -42,6 +42,7 @@ type Config struct { MaxSamples int `yaml:"max_samples"` QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"` + AtModifierEnabled bool `yaml:"at_modifier_enabled"` // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. QueryStoreAfter time.Duration `yaml:"query_store_after"` @@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.") + f.BoolVar(&cfg.AtModifierEnabled, "querier.at-modifier-enabled", false, "Enable the @ modifier in PromQL.") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") @@ -170,6 +172,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor MaxSamples: cfg.MaxSamples, Timeout: cfg.Timeout, LookbackDelta: cfg.LookbackDelta, + EnableAtModifier: cfg.AtModifierEnabled, NoStepSubqueryIntervalFn: func(int64) int64 { return cfg.DefaultEvaluationInterval.Milliseconds() }, diff --git a/pkg/querier/queryrange/results_cache.go b/pkg/querier/queryrange/results_cache.go index 0b38a18178..278389fb04 100644 --- a/pkg/querier/queryrange/results_cache.go +++ b/pkg/querier/queryrange/results_cache.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "sort" + "strings" "time" "github.com/go-kit/kit/log" @@ -17,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql/parser" "github.com/uber/jaeger-client-go" "github.com/weaveworks/common/httpgrpc" @@ -209,9 +211,9 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { cached, ok := s.get(ctx, key) if ok { - response, extents, err = s.handleHit(ctx, r, cached) + response, extents, err = s.handleHit(ctx, r, cached, maxCacheTime) } else { - response, extents, err = s.handleMiss(ctx, r) + response, extents, err = s.handleMiss(ctx, r, maxCacheTime) } if err == nil && len(extents) > 0 { @@ -226,7 +228,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { } // shouldCacheResponse says whether the response should be cached or not. -func (s resultsCache) shouldCacheResponse(ctx context.Context, r Response) bool { +func (s resultsCache) shouldCacheResponse(ctx context.Context, req Request, r Response, maxCacheTime int64) bool { headerValues := getHeaderValuesWithName(r, cacheControlHeader) for _, v := range headerValues { if v == noStoreValue { @@ -235,6 +237,10 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, r Response) bool } } + if !isAtModifierCachable(req, maxCacheTime) { + return false + } + if s.cacheGenNumberLoader == nil { return true } @@ -257,6 +263,54 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, r Response) bool return true } +var errAtModifierAfterEnd = errors.New("at modifier after end") + +// isAtModifierCachable returns true if the @ modifier result +// is safe to cache. +func isAtModifierCachable(r Request, maxCacheTime int64) bool { + // There are 2 cases when @ modifier is not safe to cache: + // 1. When @ modifier points to time beyond the maxCacheTime. + // 2. If the @ modifier time is > the query range end while being + // below maxCacheTime. In such cases if any tenant is intentionally + // playing with old data, we could cache empty result if we look + // beyond query end. + query := r.GetQuery() + if !strings.Contains(query, "@") { + return true + } + expr, err := parser.ParseExpr(query) + if err != nil { + // We are being pessimistic in such cases. + return false + } + + end := r.GetEnd() + atModCachable := true + parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { + switch e := n.(type) { + case *parser.VectorSelector: + if e.Timestamp != nil && (*e.Timestamp > end || *e.Timestamp > maxCacheTime) { + atModCachable = false + return errAtModifierAfterEnd + } + case *parser.MatrixSelector: + ts := e.VectorSelector.(*parser.VectorSelector).Timestamp + if ts != nil && (*ts > end || *ts > maxCacheTime) { + atModCachable = false + return errAtModifierAfterEnd + } + case *parser.SubqueryExpr: + if e.Timestamp != nil && (*e.Timestamp > end || *e.Timestamp > maxCacheTime) { + atModCachable = false + return errAtModifierAfterEnd + } + } + return nil + }) + + return atModCachable +} + func getHeaderValuesWithName(r Response, headerName string) (headerValues []string) { for _, hv := range r.GetHeaders() { if hv.GetName() != headerName { @@ -269,13 +323,13 @@ func getHeaderValuesWithName(r Response, headerName string) (headerValues []stri return } -func (s resultsCache) handleMiss(ctx context.Context, r Request) (Response, []Extent, error) { +func (s resultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime int64) (Response, []Extent, error) { response, err := s.next.Do(ctx, r) if err != nil { return nil, nil, err } - if !s.shouldCacheResponse(ctx, response) { + if !s.shouldCacheResponse(ctx, r, response, maxCacheTime) { return response, []Extent{}, nil } @@ -290,7 +344,7 @@ func (s resultsCache) handleMiss(ctx context.Context, r Request) (Response, []Ex return response, extents, nil } -func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent) (Response, []Extent, error) { +func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent, maxCacheTime int64) (Response, []Extent, error) { var ( reqResps []RequestResponse err error @@ -315,7 +369,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent for _, reqResp := range reqResps { responses = append(responses, reqResp.Response) - if !s.shouldCacheResponse(ctx, reqResp.Response) { + if !s.shouldCacheResponse(ctx, r, reqResp.Response, maxCacheTime) { continue } extent, err := toExtent(ctx, reqResp.Request, s.extractor.ResponseWithoutHeaders(reqResp.Response)) diff --git a/pkg/querier/queryrange/results_cache_test.go b/pkg/querier/queryrange/results_cache_test.go index 9cda8b58c9..8e98fa723a 100644 --- a/pkg/querier/queryrange/results_cache_test.go +++ b/pkg/querier/queryrange/results_cache_test.go @@ -105,16 +105,19 @@ func mkExtent(start, end int64) Extent { } func TestShouldCache(t *testing.T) { + maxCacheTime := int64(150 * 1000) c := &resultsCache{logger: util.Logger, cacheGenNumberLoader: newMockCacheGenNumberLoader()} for _, tc := range []struct { name string + request Request input Response cacheGenNumberToInject string expected bool }{ // Tests only for cacheControlHeader { - name: "does not contain the cacheControl header", + name: "does not contain the cacheControl header", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -126,7 +129,8 @@ func TestShouldCache(t *testing.T) { expected: true, }, { - name: "does contain the cacheControl header which has the value", + name: "does contain the cacheControl header which has the value", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -138,7 +142,8 @@ func TestShouldCache(t *testing.T) { expected: false, }, { - name: "cacheControl header contains extra values but still good", + name: "cacheControl header contains extra values but still good", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -151,18 +156,21 @@ func TestShouldCache(t *testing.T) { }, { name: "broken response", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{}), expected: true, }, { - name: "nil headers", + name: "nil headers", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{nil}, }), expected: true, }, { - name: "had cacheControl header but no values", + name: "had cacheControl header but no values", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{{Name: cacheControlHeader}}, }), @@ -171,7 +179,8 @@ func TestShouldCache(t *testing.T) { // Tests only for cacheGenNumber header { - name: "cacheGenNumber not set in both header and store", + name: "cacheGenNumber not set in both header and store", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -183,7 +192,8 @@ func TestShouldCache(t *testing.T) { expected: true, }, { - name: "cacheGenNumber set in store but not in header", + name: "cacheGenNumber set in store but not in header", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -196,7 +206,8 @@ func TestShouldCache(t *testing.T) { expected: false, }, { - name: "cacheGenNumber set in header but not in store", + name: "cacheGenNumber set in header but not in store", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -208,7 +219,8 @@ func TestShouldCache(t *testing.T) { expected: false, }, { - name: "cacheGenNumber in header and store are the same", + name: "cacheGenNumber in header and store are the same", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -221,7 +233,8 @@ func TestShouldCache(t *testing.T) { expected: true, }, { - name: "inconsistency between cacheGenNumber in header and store", + name: "inconsistency between cacheGenNumber in header and store", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -234,7 +247,8 @@ func TestShouldCache(t *testing.T) { expected: false, }, { - name: "cacheControl header says not to catch and cacheGenNumbers in store and headers have consistency", + name: "cacheControl header says not to catch and cacheGenNumbers in store and headers have consistency", + request: &PrometheusRequest{Query: "metric"}, input: Response(&PrometheusResponse{ Headers: []*PrometheusResponseHeader{ { @@ -250,11 +264,83 @@ func TestShouldCache(t *testing.T) { cacheGenNumberToInject: "1", expected: false, }, + { + name: "@ modifier on vector selector, before end, before maxCacheTime", + request: &PrometheusRequest{Query: "metric @ 123", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: true, + }, + { + name: "@ modifier on vector selector, after end, before maxCacheTime", + request: &PrometheusRequest{Query: "metric @ 127", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on vector selector, before end, after maxCacheTime", + request: &PrometheusRequest{Query: "metric @ 151", End: 200000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on vector selector, after end, after maxCacheTime", + request: &PrometheusRequest{Query: "metric @ 151", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on matrix selector, before end, before maxCacheTime", + request: &PrometheusRequest{Query: "rate(metric[5m] @ 123)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: true, + }, + { + name: "@ modifier on matrix selector, after end, before maxCacheTime", + request: &PrometheusRequest{Query: "rate(metric[5m] @ 127)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on matrix selector, before end, after maxCacheTime", + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 200000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on matrix selector, after end, after maxCacheTime", + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on subqueries, before end, before maxCacheTime", + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 123)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: true, + }, + { + name: "@ modifier on subqueries, after end, before maxCacheTime", + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 127)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on subqueries, before end, after maxCacheTime", + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 200000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, + { + name: "@ modifier on subqueries, after end, after maxCacheTime", + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 125000}, + input: Response(&PrometheusResponse{}), + expected: false, + }, } { { t.Run(tc.name, func(t *testing.T) { ctx := cache.InjectCacheGenNumber(context.Background(), tc.cacheGenNumberToInject) - ret := c.shouldCacheResponse(ctx, tc.input) + ret := c.shouldCacheResponse(ctx, tc.request, tc.input, maxCacheTime) require.Equal(t, tc.expected, ret) }) }