Skip to content

Allows to specify a custom interval for splitting and caching frontend requests. #1761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
* [CHANGE] Remove direct DB/API access from the ruler
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
* [CHANGE] Setting `-ingester.max-transfer-retries` to 0 now disables hand-over when ingester is shutting down. Previously, zero meant infinite number of attempts. #1771
* [CHANGE] `dynamo` has been removed as a valid storage name to make it consistent for all components. `aws` and `aws-dynamo` remain as valid storage names.
* [CHANGE] `dynamo` has been removed as a valid storage name to make it consistent for all components. `aws` and `aws-dynamo` remain as valid storage names.
* [FEATURE] Global limit on the max series per user and metric #1760
* `-ingester.max-global-series-per-user`
* `-ingester.max-global-series-per-metric`
* Requires `-distributor.replication-factor` and `-distributor.shard-by-all-labels` set for the ingesters too
* [FEATURE] Flush chunks with stale markers early with `ingester.max-stale-chunk-idle`. #1759
* [FEATURE] EXPERIMENTAL: Added new KV Store backend based on memberlist library. Components can gossip about tokens and ingester states, instead of using Consul or Etcd. #1721
* [FEATURE] Allow Query Frontend to log slow queries with `frontend.log-queries-longer-than`. #1744
* [FEATURE] The frontend split and cache intervals can now be configured using the respective flag `--querier.split-queries-by-interval` and `--frontend.cache-split-interval`.
* If `--querier.split-queries-by-interval` is not provided request splitting is disabled by default.
* __`--querier.split-queries-by-day` is still accepted for backward compatibility but has been deprecated. You should now use `--querier.split-queries-by-interval`. We recommend a to use a multiple of 24 hours.__
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
* [ENHANCEMENT] Added jitter to HA deduping heartbeats, configure using `distributor.ha-tracker.update-timeout-jitter-max` #1534
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708

## 0.3.0 / 2019-10-11

Expand Down Expand Up @@ -68,4 +74,4 @@ This release has several exciting features, the most notable of them being setti
* `ha-tracker.cluster` is now `distributor.ha-tracker.cluster`
* [FEATURE] You can specify "heap ballast" to reduce Go GC Churn #1489
* [BUGFIX] HA Tracker no longer always makes a request to Consul/Etcd when a request is not from the active replica #1516
* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508
* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508
16 changes: 14 additions & 2 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
type ResultsCacheConfig struct {
CacheConfig cache.Config `yaml:"cache"`
MaxCacheFreshness time.Duration `yaml:"max_freshness"`
SplitInterval time.Duration `yaml:"cache_split_interval"`
}

// RegisterFlags registers flags.
func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f)
f.DurationVar(&cfg.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.")
f.DurationVar(&cfg.SplitInterval, "frontend.cache-split-interval", 24*time.Hour, "The maximum interval expected for each request, results will be cached per single interval.")
}

// Extractor is used by the cache to extract a subset of a response from a cache entry.
Expand Down Expand Up @@ -72,6 +74,11 @@ type resultsCache struct {
}

// NewResultsCacheMiddleware creates results cache middleware from config.
// The middleware cache result using a unique cache key for a given request (step,query,user) and interval.
// The cache assumes that each request length (end-start) is below or equal the interval.
// Each request starting from within the same interval will hit the same cache entry.
// If the cache doesn't have the entire duration of the request cached, it will query the uncached parts and append them to the cache entries.
// see `generateKey`.
func NewResultsCacheMiddleware(logger log.Logger, cfg ResultsCacheConfig, limits Limits, merger Merger, extractor Extractor) (Middleware, error) {
c, err := cache.New(cfg.CacheConfig)
if err != nil {
Expand All @@ -98,8 +105,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}

var (
day = r.GetStart() / millisecondPerDay
key = fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), day)
key = generateKey(userID, r, s.cfg.SplitInterval)
extents []Extent
response Response
)
Expand Down Expand Up @@ -318,6 +324,12 @@ func (s resultsCache) filterRecentExtents(req Request, extents []Extent) ([]Exte
return extents, nil
}

// generateKey generates a cache key based on the userID, Request and interval.
func generateKey(userID string, r Request, interval time.Duration) string {
currentInterval := r.GetStart() / int64(interval/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

func (s resultsCache) get(ctx context.Context, key string) ([]Extent, bool) {
found, bufs, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)})
if len(found) != 1 {
Expand Down
34 changes: 34 additions & 0 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queryrange

import (
"context"
"fmt"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -214,6 +215,7 @@ func TestResultsCache(t *testing.T) {
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
SplitInterval: 24 * time.Hour,
},
fakeLimits{},
PrometheusCodec,
Expand Down Expand Up @@ -281,6 +283,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
SplitInterval: 24 * time.Hour,
},
fakeLimits{},
PrometheusCodec,
Expand Down Expand Up @@ -315,3 +318,34 @@ func Test_resultsCache_MissingData(t *testing.T) {
require.Equal(t, len(extents), 0)
require.False(t, hit)
}

func Test_generateKey(t *testing.T) {
t.Parallel()

tests := []struct {
name string
r Request
interval time.Duration
want string
}{
{"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"},
{"<30m", &PrometheusRequest{Start: toMs(10 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"},
{"30m", &PrometheusRequest{Start: toMs(30 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:1"},
{"91m", &PrometheusRequest{Start: toMs(91 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:3"},
{"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"},
{"<1d", &PrometheusRequest{Start: toMs(22 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"},
{"4d", &PrometheusRequest{Start: toMs(4 * 24 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:4"},
{"3d5h", &PrometheusRequest{Start: toMs(77 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:3"},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
if got := generateKey("fake", tt.r, tt.interval); got != tt.want {
t.Errorf("generateKey() = %v, want %v", got, tt.want)
}
})
}
}

func toMs(t time.Duration) int64 {
return int64(t / time.Millisecond)
}
25 changes: 18 additions & 7 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@ import (
"flag"
"net/http"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier/frontend"
)

const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByDay bool `yaml:"split_queries_by_day"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByDay bool `yaml:"split_queries_by_day"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Split queries by day and execute in parallel.")
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Deprecated: Split queries by day and execute in parallel.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks.")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
cfg.ResultsCacheConfig.RegisterFlags(f)
Expand Down Expand Up @@ -88,8 +94,13 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, codec Codec, cach
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align"), StepAlignMiddleware)
}
// SplitQueriesByDay is deprecated use SplitQueriesByInterval.
if cfg.SplitQueriesByDay {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_day"), SplitByDayMiddleware(limits, codec))
level.Warn(log).Log("msg", "flag querier.split-queries-by-day (or config split_queries_by_day) is deprecated, use querier.split-queries-by-interval instead.")
cfg.SplitQueriesByInterval = day
}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(cfg.SplitQueriesByInterval, limits, codec))
}
if cfg.CacheResults {
queryCacheMiddleware, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, limits, codec, cacheExtractor)
Expand Down
71 changes: 0 additions & 71 deletions pkg/querier/queryrange/split_by_day.go

This file was deleted.

72 changes: 72 additions & 0 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package queryrange

import (
"context"
"time"
)

// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval time.Duration, limits Limits, merger Merger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return splitByInterval{
next: next,
limits: limits,
merger: merger,
interval: interval,
}
})
}

type splitByInterval struct {
next Handler
limits Limits
merger Merger
interval time.Duration
}

func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs := splitQuery(r, s.interval)

reqResps, err := doRequests(ctx, s.next, reqs, s.limits)
if err != nil {
return nil, err
}

resps := make([]Response, 0, len(reqResps))
for _, reqResp := range reqResps {
resps = append(resps, reqResp.resp)
}

response, err := s.merger.MergeResponse(resps...)
if err != nil {
return nil, err
}
return response, nil
}

func splitQuery(r Request, interval time.Duration) []Request {
var reqs []Request
for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end+r.GetStep() >= r.GetEnd() {
end = r.GetEnd()
}

reqs = append(reqs, r.WithStartEnd(start, end))
}
return reqs
}

// Round up to the step before the next interval boundary.
func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
msPerInterval := int64(interval / time.Millisecond)
startOfNextInterval := ((t / msPerInterval) + 1) * msPerInterval
// ensure that target is a multiple of steps away from the start time
target := startOfNextInterval - ((startOfNextInterval - t) % step)
if target == startOfNextInterval {
target -= step
}
return target
}
Loading