Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit series for metric queries. #2903

Merged
merged 12 commits into from
Nov 18, 2020
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

Prev Previous commit
Next Next commit
Add limiter to query-frontend.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

cyriltovena committed Nov 9, 2020
commit 072e88359c9e2af178528c71265f5ed79347f029
41 changes: 26 additions & 15 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queryrange

import (
"context"
"fmt"
"net/http"
"sync"
@@ -87,27 +88,40 @@ type seriesLimiter struct {
buf []byte

maxSeries int
next queryrange.Handler
}

func newSeriesLimiter(maxSeries int) *seriesLimiter {
type seriesLimiterMiddleware int

// newSeriesLimiter creates a new series limiter middleware for use for a single request.
func newSeriesLimiter(maxSeries int) queryrange.Middleware {
return seriesLimiterMiddleware(maxSeries)
}

// Wrap wraps a global handler and returns a per request limited handler.
// The handler returned is thread safe.
func (slm seriesLimiterMiddleware) Wrap(next queryrange.Handler) queryrange.Handler {
return &seriesLimiter{
hashes: make(map[uint64]struct{}),
maxSeries: maxSeries,
maxSeries: int(slm),
buf: make([]byte, 0, 1024),
next: next,
}
}

func (sl *seriesLimiter) Add(res queryrange.Response, resErr error) (queryrange.Response, error) {
if resErr != nil {
return res, resErr
func (sl *seriesLimiter) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
// no need to fire a request if the limit is already reached.
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
res, err := sl.next.Do(ctx, req)
if err != nil {
return res, err
}
promResponse, ok := res.(*LokiPromResponse)
if !ok {
return res, nil
}
if err := sl.IsLimitReached(); err != nil {
return nil, err
}
if promResponse.Response == nil {
return res, nil
}
@@ -119,17 +133,14 @@ func (sl *seriesLimiter) Add(res queryrange.Response, resErr error) (queryrange.
sl.hashes[hash] = struct{}{}
}
sl.rw.Unlock()
if err := sl.IsLimitReached(); err != nil {
return nil, err
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
return res, nil
}

func (sl *seriesLimiter) IsLimitReached() error {
func (sl *seriesLimiter) isLimitReached() bool {
sl.rw.RLock()
defer sl.rw.RUnlock()
if len(sl.hashes) > sl.maxSeries {
return httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
return nil
return len(sl.hashes) > sl.maxSeries
}
100 changes: 100 additions & 0 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package queryrange

import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
)

func TestLimits(t *testing.T) {
@@ -44,3 +55,92 @@ func TestLimits(t *testing.T) {
cacheKeyLimits{wrapped}.GenerateCacheKey("a", r),
)
}

func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 6 with 4 in // max.
tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 4}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)

lreq := &LokiRequest{
Query: `rate({app="foo"} |= "foo"[1m])`,
Limit: 1000,
Step: 30000, //30sec
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

count, h := promqlResult(matrix)
rt.setHandler(h)

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 6, *count)

// 2 series should not be allowed.
c := new(int)
m := &sync.Mutex{}
h = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
m.Lock()
defer m.Unlock()
defer func() {
*c++
}()
// first time returns a single series
if *c == 0 {
if err := marshal.WriteQueryResponseJSON(logql.Result{Data: matrix}, rw); err != nil {
panic(err)
}
return
}
// second time returns a different series.
if err := marshal.WriteQueryResponseJSON(logql.Result{
Data: promql.Matrix{
{
Points: []promql.Point{
{
T: toMs(testTime.Add(-4 * time.Hour)),
V: 0.013333333333333334,
},
},
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/apport.log`,
},
{
Name: "job",
Value: "anotherjob",
},
},
},
},
}, rw); err != nil {
panic(err)
}
})
rt.setHandler(h)

_, err = tpw(rt).RoundTrip(req)
require.Error(t, err)
require.LessOrEqual(t, *c, 4)
}
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"net/url"
@@ -92,7 +93,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {

tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
@@ -520,7 +521,7 @@ func (f fakeLimits) MaxEntriesLimitPerQuery(string) int {
}

func (f fakeLimits) MaxQuerySeries(string) int {
return f.maxEntriesLimitPerQuery
return f.maxSeries
}

func (f fakeLimits) MaxCacheFreshness(string) time.Duration {
17 changes: 7 additions & 10 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@ func (h *splitByInterval) Process(
parallelism int,
threshold int64,
input []*lokiResult,
userID string,
) ([]queryrange.Response, error) {
var responses []queryrange.Response
ctx, cancel := context.WithCancel(ctx)
@@ -102,8 +103,10 @@ func (h *splitByInterval) Process(
p = len(input)
}

// per request wrapped handler for limiting the amount of series.
next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next)
for i := 0; i < p; i++ {
go h.loop(ctx, ch)
go h.loop(ctx, ch, next)
}

for _, x := range input {
@@ -134,21 +137,15 @@ func (h *splitByInterval) Process(
return responses, nil
}

func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) {
func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrange.Handler) {

for data := range ch {

sp, ctx := opentracing.StartSpanFromContext(ctx, "interval")
data.req.LogToSpan(sp)

resp, err := h.next.Do(ctx, data.req)
resp, err := next.Do(ctx, data.req)

// check that we're not going over the series budget.
// if err == nil {
// if promRes, ok := resp.(*LokiPromResponse); ok {

// }
// }
select {
case <-ctx.Done():
sp.Finish()
@@ -209,7 +206,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra
})
}

resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input)
resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid)
if err != nil {
return nil, err
}