Skip to content

Commit ec69629

Browse files
committed
Refactor frontend and queryrange packages for use by external projects.
The behaviour of both packages hasn't changed. Middlewares now uses `Request` and `Response` interfaces . This allows to create a roundtripper that will encode/decode HTTP requests/responses (using a `Codec`) to wrap the middleware chain. To support this new Response interface the cache protobuf message has been updated with an [`Any`](https://developers.google.com/protocol-buffers/docs/proto3#any) field. I have also updated the code so that if the field doesn't exist, it will be trigger a cache miss. The queryrange configuration has moved but flag definition remains the same, this allows seamless update if you use flag configuration. However configuration by file must be updated to new field `query_range` at root. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
1 parent 0e39075 commit ec69629

26 files changed

+1072
-918
lines changed

.lintignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
./tools*
33
./vendor*
44
./pkg/configs/legacy_promql*
5+
./.pkg*

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
56
* [CHANGE] Remove direct DB/API access from the ruler
67
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
78
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ $(EXES):
117117
protos: $(PROTO_GOS)
118118

119119
%.pb.go:
120-
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
120+
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
121121

122122
lint:
123123
./tools/lint -notestpackage -novet -ignorespelling queriers -ignorespelling Queriers .

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexproject/cortex/pkg/ingester/client"
2626
"github.com/cortexproject/cortex/pkg/querier"
2727
"github.com/cortexproject/cortex/pkg/querier/frontend"
28+
"github.com/cortexproject/cortex/pkg/querier/queryrange"
2829
"github.com/cortexproject/cortex/pkg/ring"
2930
"github.com/cortexproject/cortex/pkg/ruler"
3031
"github.com/cortexproject/cortex/pkg/util"
@@ -67,6 +68,7 @@ type Config struct {
6768
Prealloc client.PreallocConfig `yaml:"prealloc,omitempty"`
6869
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
6970
Frontend frontend.Config `yaml:"frontend,omitempty"`
71+
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
7072
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
7173
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
7274

@@ -98,6 +100,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
98100
c.Prealloc.RegisterFlags(f)
99101
c.Worker.RegisterFlags(f)
100102
c.Frontend.RegisterFlags(f)
103+
c.QueryRange.RegisterFlags(f)
101104
c.TableManager.RegisterFlags(f)
102105
c.Encoding.RegisterFlags(f)
103106

pkg/cortex/modules.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cortexproject/cortex/pkg/ingester/client"
2929
"github.com/cortexproject/cortex/pkg/querier"
3030
"github.com/cortexproject/cortex/pkg/querier/frontend"
31+
"github.com/cortexproject/cortex/pkg/querier/queryrange"
3132
"github.com/cortexproject/cortex/pkg/ring"
3233
"github.com/cortexproject/cortex/pkg/ruler"
3334
"github.com/cortexproject/cortex/pkg/util"
@@ -261,10 +262,15 @@ func (t *Cortex) stopStore() error {
261262
}
262263

263264
func (t *Cortex) initQueryFrontend(cfg *Config) (err error) {
264-
t.frontend, err = frontend.New(cfg.Frontend, util.Logger, t.overrides)
265+
t.frontend, err = frontend.New(cfg.Frontend, util.Logger)
265266
if err != nil {
266267
return
267268
}
269+
tripperware, err := queryrange.NewTripperware(cfg.QueryRange, util.Logger, t.overrides, queryrange.PrometheusCodec, queryrange.PrometheusResponseExtractor)
270+
if err != nil {
271+
return err
272+
}
273+
t.frontend.Wrap(tripperware)
268274

269275
frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
270276
t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Handler(

pkg/querier/frontend/frontend.go

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import (
2121
"github.com/weaveworks/common/httpgrpc"
2222
"github.com/weaveworks/common/httpgrpc/server"
2323
"github.com/weaveworks/common/user"
24-
25-
"github.com/cortexproject/cortex/pkg/querier/queryrange"
26-
"github.com/cortexproject/cortex/pkg/util/validation"
2724
)
2825

2926
var (
@@ -38,38 +35,22 @@ var (
3835
Name: "query_frontend_queue_length",
3936
Help: "Number of queries in the queue.",
4037
})
41-
queryRangeDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
42-
Namespace: "cortex",
43-
Name: "frontend_query_range_duration_seconds",
44-
Help: "Total time spent in seconds doing query range requests.",
45-
Buckets: prometheus.DefBuckets,
46-
}, []string{"method", "status_code"})
4738

4839
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
4940
errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled")
5041
)
5142

5243
// Config for a Frontend.
5344
type Config struct {
54-
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
55-
MaxRetries int `yaml:"max_retries"`
56-
SplitQueriesByDay bool `yaml:"split_queries_by_day"`
57-
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
58-
CacheResults bool `yaml:"cache_results"`
59-
CompressResponses bool `yaml:"compress_responses"`
60-
queryrange.ResultsCacheConfig `yaml:"results_cache"`
61-
DownstreamURL string `yaml:"downstream"`
45+
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
46+
CompressResponses bool `yaml:"compress_responses"`
47+
DownstreamURL string `yaml:"downstream"`
6248
}
6349

6450
// RegisterFlags adds the flags required to config this to the given FlagSet.
6551
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6652
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
67-
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
68-
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Split queries by day and execute in parallel.")
69-
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
70-
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
7153
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
72-
cfg.ResultsCacheConfig.RegisterFlags(f)
7354
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
7455
}
7556

@@ -96,63 +77,42 @@ type request struct {
9677
}
9778

9879
// New creates a new frontend.
99-
func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, error) {
80+
func New(cfg Config, log log.Logger) (*Frontend, error) {
10081
f := &Frontend{
10182
cfg: cfg,
10283
log: log,
10384
queues: map[string]chan *request{},
10485
}
10586
f.cond = sync.NewCond(&f.mtx)
10687

107-
// Stack up the pipeline of various query range middlewares.
108-
var queryRangeMiddleware []queryrange.Middleware
109-
if cfg.AlignQueriesWithStep {
110-
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("step_align", queryRangeDuration), queryrange.StepAlignMiddleware)
111-
}
112-
if cfg.SplitQueriesByDay {
113-
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_day", queryRangeDuration), queryrange.SplitByDayMiddleware(limits))
114-
}
115-
if cfg.CacheResults {
116-
queryCacheMiddleware, err := queryrange.NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, limits)
117-
if err != nil {
118-
return nil, err
119-
}
120-
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("results_cache", queryRangeDuration), queryCacheMiddleware)
121-
}
122-
if cfg.MaxRetries > 0 {
123-
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", queryRangeDuration), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
124-
}
125-
126-
// If the user has specified a downstream Prometheus, then we should
127-
// forward requests to that. Otherwise we will wait for queries to
128-
// contact us.
129-
var roundTripper http.RoundTripper = f
88+
// The front end implements http.RoundTripper using a GRPC worker queue by default.
89+
f.roundTripper = f
90+
// However if the user has specified a downstream Prometheus, then we should use that.
13091
if cfg.DownstreamURL != "" {
13192
u, err := url.Parse(cfg.DownstreamURL)
13293
if err != nil {
13394
return nil, err
13495
}
13596

136-
roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
97+
f.roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
13798
r.URL.Scheme = u.Scheme
13899
r.URL.Host = u.Host
139100
r.URL.Path = path.Join(u.Path, r.URL.Path)
140101
return http.DefaultTransport.RoundTrip(r)
141102
})
142103
}
143104

144-
// Finally, if the user selected any query range middleware, stitch it in.
145-
if len(queryRangeMiddleware) > 0 {
146-
roundTripper = queryrange.NewRoundTripper(
147-
roundTripper,
148-
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: roundTripper}),
149-
limits,
150-
)
151-
}
152-
f.roundTripper = roundTripper
153105
return f, nil
154106
}
155107

108+
// Wrap uses a Tripperware to chain a new RoundTripper to the frontend.
109+
func (f *Frontend) Wrap(trw Tripperware) {
110+
f.roundTripper = trw(f.roundTripper)
111+
}
112+
113+
// Tripperware is a signature for all http client-side middleware.
114+
type Tripperware func(http.RoundTripper) http.RoundTripper
115+
156116
// RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
157117
type RoundTripFunc func(*http.Request) (*http.Response, error)
158118

0 commit comments

Comments
 (0)