Skip to content

Commit fa7fd17

Browse files
committed
Add max tenant config to tenant federation
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 68a2993 commit fa7fd17

File tree

8 files changed

+140
-12
lines changed

8 files changed

+140
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
2424
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
2525
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
26+
* [ENHANCEMENT] Query Frontend: Add a flag `-tenant-federation.max-tenant` to limit the number of tenants for federated query. #6493
2627
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2728
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2829
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ tenant_federation:
161161
# CLI flag: -tenant-federation.max-concurrent
162162
[max_concurrent: <int> | default = 16]
163163

164+
# A maximum number of tenants to query at once. 0 means no limit.
165+
# CLI flag: -tenant-federation.max-tenant
166+
[max_tenant: <int> | default = 0]
167+
164168
# The ruler_config configures the Cortex ruler.
165169
[ruler: <ruler_config>]
166170

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
536536
// Wrap roundtripper into Tripperware.
537537
roundTripper = t.QueryFrontendTripperware(roundTripper)
538538

539-
handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
539+
handler := transport.NewHandler(t.Cfg.Frontend.Handler, t.Cfg.TenantFederation, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
540540
t.API.RegisterQueryFrontendHandler(handler)
541541

542542
if frontendV1 != nil {

pkg/frontend/frontend_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cortexproject/cortex/pkg/frontend/transport"
2828
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
2929
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
30+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
3031
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3132
"github.com/cortexproject/cortex/pkg/util/concurrency"
3233
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -279,7 +280,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
279280
r.PathPrefix("/").Handler(middleware.Merge(
280281
middleware.AuthenticateUser,
281282
middleware.Tracer{},
282-
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))
283+
).Wrap(transport.NewHandler(config.Handler, tenantfederation.Config{}, rt, logger, nil)))
283284

284285
httpServer := http.Server{
285286
Handler: r,

pkg/frontend/transport/handler.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"syscall"
1515
"time"
1616

17+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
1718
"github.com/go-kit/log"
1819
"github.com/go-kit/log/level"
1920
"github.com/prometheus/client_golang/prometheus"
@@ -33,6 +34,8 @@ const (
3334
// StatusClientClosedRequest is the status code for when a client request cancellation of a http request
3435
StatusClientClosedRequest = 499
3536
ServiceTimingHeaderName = "Server-Timing"
37+
38+
errTooManyTenants = "too many tenants, max: %d, actual: %d"
3639
)
3740

3841
var (
@@ -84,9 +87,10 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
8487
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
8588
// but all other logic is inside the RoundTripper.
8689
type Handler struct {
87-
cfg HandlerConfig
88-
log log.Logger
89-
roundTripper http.RoundTripper
90+
cfg HandlerConfig
91+
tenantFederationCfg tenantfederation.Config
92+
log log.Logger
93+
roundTripper http.RoundTripper
9094

9195
// Metrics.
9296
querySeconds *prometheus.CounterVec
@@ -101,11 +105,12 @@ type Handler struct {
101105
}
102106

103107
// NewHandler creates a new frontend handler.
104-
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
108+
func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
105109
h := &Handler{
106-
cfg: cfg,
107-
log: log,
108-
roundTripper: roundTripper,
110+
cfg: cfg,
111+
tenantFederationCfg: tenantFederationCfg,
112+
log: log,
113+
roundTripper: roundTripper,
109114
}
110115

111116
if cfg.QueryStatsEnabled {
@@ -185,6 +190,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
185190
if err != nil {
186191
return
187192
}
193+
194+
if f.tenantFederationCfg.Enabled {
195+
maxTenant := f.tenantFederationCfg.MaxTenant
196+
if maxTenant > 0 && len(tenantIDs) > maxTenant {
197+
http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(tenantIDs)).Error(), http.StatusBadRequest)
198+
return
199+
}
200+
}
201+
188202
userID := tenant.JoinTenantIDs(tenantIDs)
189203

190204
// Initialise the stats in the context and make sure it's propagated

pkg/frontend/transport/handler_test.go

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import (
2020
"github.com/stretchr/testify/assert"
2121
"github.com/stretchr/testify/require"
2222
"github.com/weaveworks/common/httpgrpc"
23+
"github.com/weaveworks/common/middleware"
2324
"github.com/weaveworks/common/user"
2425
"google.golang.org/grpc/codes"
2526

2627
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
28+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
2729
"github.com/cortexproject/cortex/pkg/querier/tripperware"
30+
"github.com/cortexproject/cortex/pkg/tenant"
2831
util_api "github.com/cortexproject/cortex/pkg/util/api"
2932
util_log "github.com/cortexproject/cortex/pkg/util/log"
3033
)
@@ -178,6 +181,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
178181
}, nil
179182
})
180183
userID := "12345"
184+
tenantFederationCfg := tenantfederation.Config{}
181185
for _, tt := range []struct {
182186
name string
183187
cfg HandlerConfig
@@ -379,7 +383,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
379383
} {
380384
t.Run(tt.name, func(t *testing.T) {
381385
reg := prometheus.NewPedanticRegistry()
382-
handler := NewHandler(tt.cfg, tt.roundTripperFunc, log.NewNopLogger(), reg)
386+
handler := NewHandler(tt.cfg, tenantFederationCfg, tt.roundTripperFunc, log.NewNopLogger(), reg)
383387

384388
ctx := user.InjectOrgID(context.Background(), userID)
385389
req := httptest.NewRequest("GET", "/", nil)
@@ -413,7 +417,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
413417
func TestReportQueryStatsFormat(t *testing.T) {
414418
outputBuf := bytes.NewBuffer(nil)
415419
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
416-
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil)
420+
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
417421
userID := "fake"
418422
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
419423
resp := &http.Response{ContentLength: 1000}
@@ -506,3 +510,101 @@ func TestReportQueryStatsFormat(t *testing.T) {
506510
})
507511
}
508512
}
513+
514+
func Test_TenantFederation_MaxTenant(t *testing.T) {
515+
// set a multi tenant resolver
516+
tenant.WithDefaultResolver(tenant.NewMultiResolver())
517+
518+
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
519+
return &http.Response{
520+
StatusCode: http.StatusOK,
521+
Body: io.NopCloser(strings.NewReader("{}")),
522+
}, nil
523+
})
524+
525+
tests := []struct {
526+
name string
527+
cfg tenantfederation.Config
528+
orgId string
529+
expectedStatusCode int
530+
expectedErrMsg string
531+
}{
532+
{
533+
name: "one tenant",
534+
cfg: tenantfederation.Config{
535+
Enabled: true,
536+
MaxTenant: 0,
537+
},
538+
orgId: "org1",
539+
expectedStatusCode: http.StatusOK,
540+
},
541+
{
542+
name: "less than max tenant",
543+
cfg: tenantfederation.Config{
544+
Enabled: true,
545+
MaxTenant: 3,
546+
},
547+
orgId: "org1|org2",
548+
expectedStatusCode: http.StatusOK,
549+
},
550+
{
551+
name: "equal to max tenant",
552+
cfg: tenantfederation.Config{
553+
Enabled: true,
554+
MaxTenant: 2,
555+
},
556+
orgId: "org1|org2",
557+
expectedStatusCode: http.StatusOK,
558+
},
559+
{
560+
name: "exceeds max tenant",
561+
cfg: tenantfederation.Config{
562+
Enabled: true,
563+
MaxTenant: 2,
564+
},
565+
orgId: "org1|org2|org3",
566+
expectedStatusCode: http.StatusBadRequest,
567+
expectedErrMsg: "too many tenants, max: 2, actual: 3",
568+
},
569+
{
570+
name: "no org Id",
571+
cfg: tenantfederation.Config{
572+
Enabled: true,
573+
MaxTenant: 0,
574+
},
575+
orgId: "",
576+
expectedStatusCode: http.StatusUnauthorized,
577+
expectedErrMsg: "no org id",
578+
},
579+
{
580+
name: "no limit",
581+
cfg: tenantfederation.Config{
582+
Enabled: true,
583+
MaxTenant: 0,
584+
},
585+
orgId: "org1|org2|org3",
586+
expectedStatusCode: http.StatusOK,
587+
},
588+
}
589+
590+
for _, test := range tests {
591+
t.Run(test.name, func(t *testing.T) {
592+
handler := NewHandler(HandlerConfig{}, test.cfg, roundTripper, log.NewNopLogger(), nil)
593+
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)
594+
595+
req := httptest.NewRequest("GET", "http://fake", nil)
596+
req.Header.Set("X-Scope-OrgId", test.orgId)
597+
resp := httptest.NewRecorder()
598+
599+
handlerWithAuth.ServeHTTP(resp, req)
600+
601+
body, err := io.ReadAll(resp.Body)
602+
require.NoError(t, err)
603+
require.Equal(t, test.expectedStatusCode, resp.Code)
604+
605+
if test.expectedErrMsg != "" {
606+
require.Contains(t, string(body), test.expectedErrMsg)
607+
}
608+
})
609+
}
610+
}

pkg/frontend/v1/frontend_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/cortexproject/cortex/pkg/frontend/transport"
3131
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
32+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
3233
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3334
"github.com/cortexproject/cortex/pkg/scheduler/queue"
3435
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -264,14 +265,16 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
264265

265266
// Default HTTP handler config.
266267
handlerCfg := transport.HandlerConfig{}
268+
tenantFederationCfg := tenantfederation.Config{}
269+
267270
flagext.DefaultValues(&handlerCfg)
268271

269272
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
270273
r := mux.NewRouter()
271274
r.PathPrefix("/").Handler(middleware.Merge(
272275
middleware.AuthenticateUser,
273276
middleware.Tracer{},
274-
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))
277+
).Wrap(transport.NewHandler(handlerCfg, tenantFederationCfg, rt, logger, nil)))
275278

276279
httpServer := http.Server{
277280
Handler: r,

pkg/querier/tenantfederation/tenant_federation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ type Config struct {
99
Enabled bool `yaml:"enabled"`
1010
// MaxConcurrent The number of workers used for processing federated query.
1111
MaxConcurrent int `yaml:"max_concurrent"`
12+
// MaxTenant A maximum number of tenants to query at once.
13+
MaxTenant int `yaml:"max_tenant"`
1214
}
1315

1416
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
1517
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
1618
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.")
19+
f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.")
1720
}

0 commit comments

Comments
 (0)