Skip to content

Enforced -querier.max-query-lookback in the query-frontend for range queries #3458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* [ENHANCEMENT] Enforced keepalive on all gRPC clients used for inter-service communication. #3431
* [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388
* [ENHANCEMENT] Query-Frontend / Query-Scheduler: New component called "Query-Scheduler" has been introduced. Query-Scheduler is simply a queue of requests, moved outside of Query-Frontend. This allows Query-Frontend to be scaled separately from number of queues. To make Query-Frontend and Querier use Query-Scheduler, they need to be started with `-frontend.scheduler-address` and `-querier.scheduler-address` options respectively. #3374
* [ENHANCEMENT] Querier: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis. #3452
* [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
6 changes: 3 additions & 3 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -465,7 +465,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -531,7 +531,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size
Expand Down
6 changes: 3 additions & 3 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -520,7 +520,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -586,7 +586,7 @@ blocks_storage:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# run. If more keys are specified, internally keys are split into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size
Expand Down
13 changes: 8 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3052,7 +3052,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
[max_chunks_per_query: <int> | default = 2000000]

# Limit how long back data (series and metadata) can be queried, up until
# <lookback> duration ago. 0 to disable.
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
# and ruler. If the requested time range is outside the allowed range, the
# request will not fail but will be manipulated to only query data within the
# allowed time range. 0 to disable.
# CLI flag: -querier.max-query-lookback
[max_query_lookback: <duration> | default = 0s]

Expand All @@ -3062,7 +3065,7 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -store.max-query-length
[max_query_length: <duration> | default = 0s]

# Maximum number of queries will be scheduled in parallel by the frontend.
# Maximum number of split queries will be scheduled in parallel by the frontend.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 14]

Expand Down Expand Up @@ -3592,7 +3595,7 @@ bucket_store:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should run.
# If more keys are specified, internally keys are splitted into multiple
# If more keys are specified, internally keys are split into multiple
# batches and fetched concurrently, honoring the max concurrency. If set
# to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -3643,7 +3646,7 @@ bucket_store:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should run.
# If more keys are specified, internally keys are splitted into multiple
# If more keys are specified, internally keys are split into multiple
# batches and fetched concurrently, honoring the max concurrency. If set
# to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
Expand Down Expand Up @@ -3708,7 +3711,7 @@ bucket_store:
[max_get_multi_concurrency: <int> | default = 100]

# The maximum number of keys a single underlying get operation should run.
# If more keys are specified, internally keys are splitted into multiple
# If more keys are specified, internally keys are split into multiple
# batches and fetched concurrently, honoring the max concurrency. If set
# to 0, the max batch size is unlimited.
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
// Make sure to log it in traces to ease debugging.
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "the end time of the query has been manipulated because of the 'max query into future' setting",
"original", origEndTime, "updated", endTime)
"original", util.FormatTimeModel(origEndTime),
"updated", util.FormatTimeModel(endTime))

if endTime.Before(startTime) {
return 0, 0, errEmptyTimeRange
Expand All @@ -527,7 +528,8 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
// Make sure to log it in traces to ease debugging.
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", origStartTime, "updated", startTime)
"original", util.FormatTimeModel(origStartTime),
"updated", util.FormatTimeModel(startTime))

if endTime.Before(startTime) {
return 0, 0, errEmptyTimeRange
Expand Down
111 changes: 48 additions & 63 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,107 +5,92 @@ import (
"net/http"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// Limits allows us to specify per-tenant runtime limits on the behavior of
// the query handling code.
type Limits interface {
// MaxQueryLookback returns the max lookback period of queries.
MaxQueryLookback(userID string) time.Duration

// MaxQueryLength returns the limit of the length (in time) of a query.
MaxQueryLength(string) time.Duration

// MaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel.
MaxQueryParallelism(string) int

// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.
MaxCacheFreshness(string) time.Duration
}

type limits struct {
type limitsMiddleware struct {
Limits
next Handler
}

// LimitsMiddleware creates a new Middleware that invalidates large queries based on Limits interface.
func LimitsMiddleware(l Limits) Middleware {
// NewLimitsMiddleware creates a new Middleware that enforces query limits.
func NewLimitsMiddleware(l Limits) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return limits{
return limitsMiddleware{
next: next,
Limits: l,
}
})
}

func (l limits) Do(ctx context.Context, r Request) (Response, error) {
userid, err := user.ExtractOrgID(ctx)
func (l limitsMiddleware) Do(ctx context.Context, r Request) (Response, error) {
log, ctx := spanlogger.New(ctx, "limits")
defer log.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxQueryLen := l.MaxQueryLength(userid)
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if maxQueryLen > 0 && queryLen > maxQueryLen {
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLen)
}
return l.next.Do(ctx, r)
}
// Clamp the time range based on the max query lookback.
if maxQueryLookback := l.MaxQueryLookback(userID); maxQueryLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))

// RequestResponse contains a request response and the respective request that was used.
type RequestResponse struct {
Request Request
Response Response
}
if r.GetEnd() < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(log).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
"reqStart", util.FormatTimeMillis(r.GetStart()),
"redEnd", util.FormatTimeMillis(r.GetEnd()),
"maxQueryLookback", maxQueryLookback)

// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request.
func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits Limits) ([]RequestResponse, error) {
userid, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return NewEmptyPrometheusResponse(), nil
}

// If one of the requests fail, we want to be able to cancel the rest of them.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if r.GetStart() < minStartTime {
// Replace the start time in the request.
level.Debug(log).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", util.FormatTimeMillis(r.GetStart()),
"updated", util.FormatTimeMillis(minStartTime))

// Feed all requests to a bounded intermediate channel to limit parallelism.
intermediate := make(chan Request)
go func() {
for _, req := range reqs {
intermediate <- req
r = r.WithStartEnd(minStartTime, r.GetEnd())
}
close(intermediate)
}()

respChan, errChan := make(chan RequestResponse), make(chan error)
parallelism := limits.MaxQueryParallelism(userid)
if parallelism > len(reqs) {
parallelism = len(reqs)
}
for i := 0; i < parallelism; i++ {
go func() {
for req := range intermediate {
resp, err := downstream.Do(ctx, req)
if err != nil {
errChan <- err
} else {
respChan <- RequestResponse{req, resp}
}
}
}()
}

resps := make([]RequestResponse, 0, len(reqs))
var firstErr error
for range reqs {
select {
case resp := <-respChan:
resps = append(resps, resp)
case err := <-errChan:
if firstErr == nil {
cancel()
firstErr = err
}
// Enforce the max query length.
if maxQueryLength := l.MaxQueryLength(userID); maxQueryLength > 0 {
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if queryLen > maxQueryLength {
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength)
}
}

return resps, firstErr
return l.next.Do(ctx, r)
}
Loading