Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support limits in multi-tenant queries. #5626

Merged
merged 13 commits into from
Mar 17, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Main
* [5626](https://github.com/grafana/loki/pull/5626) **jeschkies** Support multi-tenant select logs and samples queries.
* [5622](https://github.com/grafana/loki/pull/5622) **chaudum**: Fix bug in query splitter that caused `interval` query parameter to be ignored and therefore returning more logs than expected.
* [5521](https://github.com/grafana/loki/pull/5521) **cstyan**: Move stream lag configuration to top level clients config struct and refactor stream lag metric, this resolves a bug with duplicate metric collection when a single Promtail binary is running multiple Promtail clients.
* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing`
Expand Down
12 changes: 6 additions & 6 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
)

var (
Expand Down Expand Up @@ -243,24 +244,23 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return q.evalLiteral(ctx, lit)
}

userID, err := tenant.TenantID(ctx)
expr, err := optimizeSampleExpr(expr)
if err != nil {
return nil, err
}

expr, err = optimizeSampleExpr(expr)
stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, q.limits.MaxQuerySeries)
seriesIndex := map[uint64]*promql.Series{}
maxSeries := q.limits.MaxQuerySeries(userID)

next, ts, vec := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/tenant"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
Expand Down Expand Up @@ -235,6 +236,7 @@ func (t *Loki) initQuerier() (services.Service, error) {

if t.Cfg.Querier.MultiTenantQueriesEnabled {
t.Querier = querier.NewMultiTenantQuerier(q, util_log.Logger)
tenant.WithDefaultResolver(tenant.NewMultiResolver())
} else {
t.Querier = q
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/pkg/util/server"
util_validation "github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -380,7 +381,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
}

func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, limit uint32) error {
userID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
Expand All @@ -395,7 +396,7 @@ func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, li
return nil
}

maxEntriesLimit := q.limits.MaxEntriesLimitPerQuery(userID)
maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, q.limits.MaxEntriesLimitPerQuery)
if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", limit, maxEntriesLimit)
Expand Down
8 changes: 3 additions & 5 deletions pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ const (
// MultiTenantQuerier is able to query across different tenants.
type MultiTenantQuerier struct {
Querier
resolver tenant.Resolver
}

// NewMultiTenantQuerier returns a new querier able to query across different tenants.
func NewMultiTenantQuerier(querier Querier, logger log.Logger) *MultiTenantQuerier {
return &MultiTenantQuerier{
Querier: querier,
resolver: tenant.NewMultiResolver(),
Querier: querier,
}
}

func (q *MultiTenantQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
tenantIDs, err := q.resolver.TenantIDs(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
Expand All @@ -56,7 +54,7 @@ func (q *MultiTenantQuerier) SelectLogs(ctx context.Context, params logql.Select
}

func (q *MultiTenantQuerier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
tenantIDs, err := q.resolver.TenantIDs(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/multi_tenant_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/tenant"
)

func TestMultiTenantQuerier_SelectLogs(t *testing.T) {
original := tenant.DefaultResolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
defer tenant.WithDefaultResolver(original)

for _, tc := range []struct {
desc string
orgID string
Expand Down Expand Up @@ -74,6 +79,10 @@ func TestMultiTenantQuerier_SelectLogs(t *testing.T) {
}

func TestMultiTenantQuerier_SelectSamples(t *testing.T) {
original := tenant.DefaultResolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
defer tenant.WithDefaultResolver(original)

for _, tc := range []struct {
desc string
orgID string
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
if span := opentracing.SpanFromContext(ctx); span != nil {
request.LogToSpan(span)
}
userid, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

parallelism := rt.limits.MaxQueryParallelism(userid)
parallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, rt.limits.MaxQueryParallelism)
if parallelism < 1 {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, ErrMaxQueryParalellism.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/util/validation"
)

var errInvalidShardingRange = errors.New("Query does not fit in a single sharding configuration")
Expand Down Expand Up @@ -191,11 +192,11 @@ type shardSplitter struct {
}

func (splitter *shardSplitter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
userid, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
minShardingLookback := splitter.limits.MinShardingLookback(userid)
minShardingLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, splitter.limits.MinShardingLookback)
if minShardingLookback == 0 {
return splitter.shardingware.Do(ctx, r)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/util/validation"
)

// Config is the configuration for the queryrange tripperware
Expand Down Expand Up @@ -203,12 +204,12 @@ func transformRegexQuery(req *http.Request, expr syntax.LogSelectorExpr) (syntax

// validates log entries limits
func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error {
userID, err := tenant.TenantID(req.Context())
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID)
maxEntriesLimit := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, limits.MaxEntriesLimitPerQuery)
if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit)
Expand Down
13 changes: 8 additions & 5 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

type lokiResult struct {
Expand Down Expand Up @@ -89,7 +90,7 @@ func (h *splitByInterval) Process(
parallelism int,
threshold int64,
input []*lokiResult,
userID string,
maxSeries int,
) ([]queryrangebase.Response, error) {
var responses []queryrangebase.Response
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -110,7 +111,7 @@ func (h *splitByInterval) Process(
}

// per request wrapped handler for limiting the amount of series.
next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next)
next := newSeriesLimiter(maxSeries).Wrap(h.next)
for i := 0; i < p; i++ {
go h.loop(ctx, ch, next)
}
Expand Down Expand Up @@ -161,12 +162,12 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next
}

func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
userid, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

interval := h.limits.QuerySplitDuration(userid)
interval := validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.QuerySplitDuration)
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
// skip split by if unset
if interval == 0 {
return h.next.Do(ctx, r)
Expand Down Expand Up @@ -215,7 +216,9 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
})
}

resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid)
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, h.limits.MaxQuerySeries)
maxParallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, h.limits.MaxQueryParallelism)
resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries)
if err != nil {
return nil, err
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,22 @@ func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time
}
return result
}

// MaxDurationOrDisabledPerTenant is returning the maximum duration per tenant or zero if one tenant has time.Duration(0).
func MaxDurationOrZeroPerTenant(tenantIDs []string, f func(string) time.Duration) time.Duration {
var result *time.Duration
for _, tenantID := range tenantIDs {
v := f(tenantID)
if v == 0 {
return v
}

if v > 0 && (result == nil || v > *result) {
result = &v
}
}
if result == nil {
return 0
}
return *result
}
Loading