Skip to content

Frontend middlewares refactoring #1734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .lintignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
./tools*
./vendor*
./pkg/configs/legacy_promql*
./.pkg*
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
* [CHANGE] Remove direct DB/API access from the ruler
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ $(EXES):
protos: $(PROTO_GOS)

%.pb.go:
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)

lint:
./tools/lint -notestpackage -novet -ignorespelling queriers -ignorespelling Queriers .
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Config struct {
Prealloc client.PreallocConfig `yaml:"prealloc,omitempty"`
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
Frontend frontend.Config `yaml:"frontend,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.

Expand Down Expand Up @@ -98,6 +100,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Prealloc.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.Frontend.RegisterFlags(f)
c.QueryRange.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
c.Encoding.RegisterFlags(f)

Expand Down
8 changes: 7 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -261,10 +262,15 @@ func (t *Cortex) stopStore() error {
}

func (t *Cortex) initQueryFrontend(cfg *Config) (err error) {
t.frontend, err = frontend.New(cfg.Frontend, util.Logger, t.overrides)
t.frontend, err = frontend.New(cfg.Frontend, util.Logger)
if err != nil {
return
}
tripperware, err := queryrange.NewTripperware(cfg.QueryRange, util.Logger, t.overrides, queryrange.PrometheusCodec, queryrange.PrometheusResponseExtractor)
if err != nil {
return err
}
t.frontend.Wrap(tripperware)

frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Handler(
Expand Down
72 changes: 16 additions & 56 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
Expand All @@ -38,38 +35,22 @@ var (
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
})
queryRangeDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "frontend_query_range_duration_seconds",
Help: "Total time spent in seconds doing query range requests.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "status_code"})

errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled")
)

// Config for a Frontend.
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByDay bool `yaml:"split_queries_by_day"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
CacheResults bool `yaml:"cache_results"`
CompressResponses bool `yaml:"compress_responses"`
queryrange.ResultsCacheConfig `yaml:"results_cache"`
DownstreamURL string `yaml:"downstream"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Split queries by day and execute in parallel.")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
cfg.ResultsCacheConfig.RegisterFlags(f)
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
}

Expand All @@ -96,63 +77,42 @@ type request struct {
}

// New creates a new frontend.
func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, error) {
func New(cfg Config, log log.Logger) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
queues: map[string]chan *request{},
}
f.cond = sync.NewCond(&f.mtx)

// Stack up the pipeline of various query range middlewares.
var queryRangeMiddleware []queryrange.Middleware
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("step_align", queryRangeDuration), queryrange.StepAlignMiddleware)
}
if cfg.SplitQueriesByDay {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_day", queryRangeDuration), queryrange.SplitByDayMiddleware(limits))
}
if cfg.CacheResults {
queryCacheMiddleware, err := queryrange.NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, limits)
if err != nil {
return nil, err
}
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("results_cache", queryRangeDuration), queryCacheMiddleware)
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", queryRangeDuration), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
}

// If the user has specified a downstream Prometheus, then we should
// forward requests to that. Otherwise we will wait for queries to
// contact us.
var roundTripper http.RoundTripper = f
// The front end implements http.RoundTripper using a GRPC worker queue by default.
f.roundTripper = f
// However if the user has specified a downstream Prometheus, then we should use that.
if cfg.DownstreamURL != "" {
u, err := url.Parse(cfg.DownstreamURL)
if err != nil {
return nil, err
}

roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
f.roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
r.URL.Scheme = u.Scheme
r.URL.Host = u.Host
r.URL.Path = path.Join(u.Path, r.URL.Path)
return http.DefaultTransport.RoundTrip(r)
})
}

// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
roundTripper = queryrange.NewRoundTripper(
roundTripper,
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: roundTripper}),
limits,
)
}
f.roundTripper = roundTripper
return f, nil
}

// Wrap uses a Tripperware to chain a new RoundTripper to the frontend.
func (f *Frontend) Wrap(trw Tripperware) {
f.roundTripper = trw(f.roundTripper)
}

// Tripperware is a signature for all http client-side middleware.
type Tripperware func(http.RoundTripper) http.RoundTripper

// RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
type RoundTripFunc func(*http.Request) (*http.Response, error)

Expand Down
Loading