Skip to content

Commit 6614def

Browse files
authored
Add multi tenant query federation (#3250)
* Add tenant query federation This experimental feature allows queries to cover data from more than a single Cortex tenant and can be activated by supplying `-tenant-federation.enabled` to all cortex instances. To query multiple tenants a `|` separated tenant list can be specified in the `X-Scope-OrgID` header. The source tenant of a metric will be exposed in the label `__tenant_id__`. Signed-off-by: Christian Simon <simon@swine.de> * Aggregate the limit of maxQueriers correctly This ensures the limit is aggregated correctly of the setting of each individual tenant. It also implements the logic for the v2 query frontend. Signed-off-by: Christian Simon <simon@swine.de> * Fix tenant labels and make LabelNames more efficient Signed-off-by: Christian Simon <simon@swine.de> * Use tsdb_errors for error handling Signed-off-by: Christian Simon <simon@swine.de> * Fix uninitialized label matcher Regexp matcher need to be initialized, this adds a test for regexp matcher and fixes the underlying issue. Signed-off-by: Christian Simon <simon@swine.de> * Use map for filterValuesByMatchers to reduce allocations Signed-off-by: Christian Simon <simon@swine.de> * Address review suggestions Signed-off-by: Christian Simon <simon@swine.de> * Add validation.SmallestPositiveNonZeroIntPerTenant to avoid code duplication Signed-off-by: Christian Simon <simon@swine.de> * Add limitations and experimental status to docs Signed-off-by: Christian Simon <simon@swine.de> * Remove unnecessary cast of defaultTenantLabel Signed-off-by: Christian Simon <simon@swine.de> * Handle query-range limits for multi tenant queries Signed-off-by: Christian Simon <simon@swine.de> * Skip results cache, if multi tenants query Signed-off-by: Christian Simon <simon@swine.de> * Add failint to ensure query path supports multiple tenants To avoid any future regressions in the multi tenant support within the query path, a faillint command tests if TenantID is used and if it finds one, it suggestest using TenantIDs instead> Signed-off-by: Christian Simon <simon@swine.de> * Align CHANGELOG line with the flag description Signed-off-by: Christian Simon <simon@swine.de> * Add a limitation about missing results cache Signed-off-by: Christian Simon <simon@swine.de>
1 parent 2c22662 commit 6614def

File tree

24 files changed

+1123
-41
lines changed

24 files changed

+1123
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [CHANGE] Querier: it's not required to set `-frontend.query-stats-enabled=true` in the querier anymore to enable query statistics logging in the query-frontend. The flag is now required to be configured only in the query-frontend and it will be propagated to the queriers. #3595
66
* [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583
77
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
8+
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
89
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers and store-gateways. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583
910
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier. When enabled, the querier will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics have been added: #3614
1011
* `cortex_bucket_index_loads_total`

Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ lint:
158158
GOFLAGS="-tags=requires_docker" faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\
159159
golang.org/x/net/context=context,\
160160
sync/atomic=go.uber.org/atomic,\
161-
github.com/weaveworks/common/user.{ExtractOrgID}=github.com/cortexproject/cortex/pkg/tenant.{TenantID},\
161+
github.com/prometheus/client_golang/prometheus.{MultiError}=github.com/prometheus/prometheus/tsdb/errors.{NewMulti},\
162+
github.com/weaveworks/common/user.{ExtractOrgID}=github.com/cortexproject/cortex/pkg/tenant.{TenantID,TenantIDs},\
162163
github.com/weaveworks/common/user.{ExtractOrgIDFromHTTPRequest}=github.com/cortexproject/cortex/pkg/tenant.{ExtractTenantIDFromHTTPRequest}" ./pkg/... ./cmd/... ./tools/... ./integration/...
163164

164165
# Ensure clean pkg structure.
@@ -172,6 +173,14 @@ lint:
172173
faillint -paths "github.com/cortexproject/cortex/pkg/querier/..." ./pkg/scheduler/...
173174
faillint -paths "github.com/cortexproject/cortex/pkg/storage/tsdb/..." ./pkg/storage/bucket/...
174175

176+
# Ensure the query path is supporting multiple tenants
177+
faillint -paths "\
178+
github.com/cortexproject/cortex/pkg/tenant.{TenantID}=github.com/cortexproject/cortex/pkg/tenant.{TenantIDs}" \
179+
./pkg/scheduler/... \
180+
./pkg/frontend/... \
181+
./pkg/querier/tenantfederation/... \
182+
./pkg/querier/queryrange/...
183+
175184
# Validate Kubernetes spec files. Requires:
176185
# https://kubeval.instrumenta.dev
177186
kubeval ./k8s/*

docs/configuration/config-file-reference.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ api:
143143
# The purger_config configures the purger which takes care of delete requests
144144
[purger: <purger_config>]
145145

146+
tenant_federation:
147+
# If enabled on all Cortex services, queries can be federated across multiple
148+
# tenants. The tenant IDs involved need to be specified separated by a `|`
149+
# character in the `X-Scope-OrgID` header (experimental).
150+
# CLI flag: -tenant-federation.enabled
151+
[enabled: <boolean> | default = false]
152+
146153
# The ruler_config configures the Cortex ruler.
147154
[ruler: <ruler_config>]
148155

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,4 @@ Currently experimental features are:
6666
- Blocks storage bucket index
6767
- The bucket index support in the querier (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
6868
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
69+
- Querier: tenant federation

docs/guides/limitations.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,7 @@ The Cortex chunks storage doesn't support queries without a metric name, like `c
4343
## Query series and labels
4444

4545
When running queries to the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints, query's time range is ignored and the data is always fetched from ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set.
46+
47+
## Tenant federation
48+
49+
When tenant federation is enabled on a Cortex cluster, result caching is disabled for queries spanning more than a single tenant. Result caching is planned to be implemented before general availability of this feature.
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// +build requires_docker
2+
3+
package integration
4+
5+
import (
6+
"fmt"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/pkg/labels"
13+
"github.com/prometheus/prometheus/prompb"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
17+
"github.com/cortexproject/cortex/integration/e2e"
18+
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
19+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
20+
"github.com/cortexproject/cortex/integration/e2ecortex"
21+
)
22+
23+
type querierTenantFederationConfig struct {
24+
querySchedulerEnabled bool
25+
shuffleShardingEnabled bool
26+
}
27+
28+
func TestQuerierTenantFederation(t *testing.T) {
29+
runQuerierTenantFederationTest(t, querierTenantFederationConfig{})
30+
}
31+
32+
func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) {
33+
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
34+
querySchedulerEnabled: true,
35+
})
36+
}
37+
38+
func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) {
39+
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
40+
shuffleShardingEnabled: true,
41+
})
42+
}
43+
44+
func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) {
45+
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
46+
querySchedulerEnabled: true,
47+
shuffleShardingEnabled: true,
48+
})
49+
}
50+
51+
func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) {
52+
const numUsers = 10
53+
54+
s, err := e2e.NewScenario(networkName)
55+
require.NoError(t, err)
56+
defer s.Close()
57+
58+
memcached := e2ecache.NewMemcached()
59+
consul := e2edb.NewConsul()
60+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
61+
62+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
63+
"-querier.cache-results": "true",
64+
"-querier.split-queries-by-interval": "24h",
65+
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
66+
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
67+
"-tenant-federation.enabled": "true",
68+
})
69+
70+
// Start the query-scheduler if enabled.
71+
var queryScheduler *e2ecortex.CortexService
72+
if cfg.querySchedulerEnabled {
73+
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
74+
require.NoError(t, s.StartAndWaitReady(queryScheduler))
75+
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
76+
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
77+
}
78+
79+
if cfg.shuffleShardingEnabled {
80+
// Use only single querier for each user.
81+
flags["-frontend.max-queriers-per-tenant"] = "1"
82+
}
83+
84+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
85+
require.NoError(t, s.StartAndWaitReady(minio))
86+
87+
// Start the query-frontend.
88+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
89+
require.NoError(t, s.Start(queryFrontend))
90+
91+
if !cfg.querySchedulerEnabled {
92+
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
93+
}
94+
95+
// Start all other services.
96+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
97+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
98+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
99+
100+
var querier2 *e2ecortex.CortexService
101+
if cfg.shuffleShardingEnabled {
102+
querier2 = e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "")
103+
}
104+
105+
require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor))
106+
require.NoError(t, s.WaitReady(queryFrontend))
107+
if cfg.shuffleShardingEnabled {
108+
require.NoError(t, s.StartAndWaitReady(querier2))
109+
}
110+
111+
// Wait until distributor and queriers have updated the ring.
112+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
113+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
114+
if cfg.shuffleShardingEnabled {
115+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
116+
}
117+
118+
// Push a series for each user to Cortex.
119+
now := time.Now()
120+
expectedVectors := make([]model.Vector, numUsers)
121+
tenantIDs := make([]string, numUsers)
122+
123+
for u := 0; u < numUsers; u++ {
124+
tenantIDs[u] = fmt.Sprintf("user-%d", u)
125+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u])
126+
require.NoError(t, err)
127+
128+
var series []prompb.TimeSeries
129+
series, expectedVectors[u] = generateSeries("series_1", now)
130+
131+
res, err := c.Push(series)
132+
require.NoError(t, err)
133+
require.Equal(t, 200, res.StatusCode)
134+
}
135+
136+
// query all tenants
137+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", strings.Join(tenantIDs, "|"))
138+
require.NoError(t, err)
139+
140+
result, err := c.Query("series_1", now)
141+
require.NoError(t, err)
142+
143+
assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))
144+
145+
// ensure a push to multiple tenants is failing
146+
series, _ := generateSeries("series_1", now)
147+
res, err := c.Push(series)
148+
require.NoError(t, err)
149+
require.Equal(t, 500, res.StatusCode)
150+
151+
// check metric label values for total queries in the query frontend
152+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
153+
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")),
154+
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))
155+
156+
// check metric label values for query queue length in either query frontend or query scheduler
157+
queueComponent := queryFrontend
158+
queueMetricName := "cortex_query_frontend_queue_length"
159+
if cfg.querySchedulerEnabled {
160+
queueComponent = queryScheduler
161+
queueMetricName = "cortex_query_scheduler_queue_length"
162+
}
163+
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers(
164+
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")))))
165+
166+
// TODO: check cache invalidation on tombstone cache gen increase
167+
// TODO: check fairness in queryfrontend
168+
}
169+
170+
func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vector {
171+
var v model.Vector
172+
for pos, tenantID := range tenantIDs {
173+
for _, r := range resultsPerTenant[pos] {
174+
var s model.Sample = *r
175+
s.Metric = r.Metric.Clone()
176+
s.Metric[model.LabelName("__tenant_id__")] = model.LabelValue(tenantID)
177+
v = append(v, &s)
178+
}
179+
}
180+
return v
181+
}

pkg/api/middlewares.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.TombstonesLoader) middleware.Interface {
1515
return middleware.Func(func(next http.Handler) http.Handler {
1616
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
17-
userID, err := tenant.TenantID(r.Context())
17+
tenantIDs, err := tenant.TenantIDs(r.Context())
1818
if err != nil {
1919
http.Error(w, err.Error(), http.StatusUnauthorized)
2020
return
2121
}
2222

23-
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userID)
23+
// len(tenantIDs) will always be > 0, as it otherwise errors
24+
// TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers
25+
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0])
2426

2527
w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber)
2628
next.ServeHTTP(w, r)

pkg/cortex/cortex.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/cortexproject/cortex/pkg/ingester/client"
4040
"github.com/cortexproject/cortex/pkg/querier"
4141
"github.com/cortexproject/cortex/pkg/querier/queryrange"
42+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
4243
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
4344
"github.com/cortexproject/cortex/pkg/ring"
4445
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
@@ -47,6 +48,7 @@ import (
4748
"github.com/cortexproject/cortex/pkg/scheduler"
4849
"github.com/cortexproject/cortex/pkg/storage/tsdb"
4950
"github.com/cortexproject/cortex/pkg/storegateway"
51+
"github.com/cortexproject/cortex/pkg/tenant"
5052
"github.com/cortexproject/cortex/pkg/util"
5153
"github.com/cortexproject/cortex/pkg/util/fakeauth"
5254
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -82,27 +84,28 @@ type Config struct {
8284
PrintConfig bool `yaml:"-"`
8385
HTTPPrefix string `yaml:"http_prefix"`
8486

85-
API api.Config `yaml:"api"`
86-
Server server.Config `yaml:"server"`
87-
Distributor distributor.Config `yaml:"distributor"`
88-
Querier querier.Config `yaml:"querier"`
89-
IngesterClient client.Config `yaml:"ingester_client"`
90-
Ingester ingester.Config `yaml:"ingester"`
91-
Flusher flusher.Config `yaml:"flusher"`
92-
Storage storage.Config `yaml:"storage"`
93-
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
94-
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
95-
LimitsConfig validation.Limits `yaml:"limits"`
96-
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
97-
Worker querier_worker.Config `yaml:"frontend_worker"`
98-
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
99-
QueryRange queryrange.Config `yaml:"query_range"`
100-
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
101-
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
102-
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
103-
Compactor compactor.Config `yaml:"compactor"`
104-
StoreGateway storegateway.Config `yaml:"store_gateway"`
105-
PurgerConfig purger.Config `yaml:"purger"`
87+
API api.Config `yaml:"api"`
88+
Server server.Config `yaml:"server"`
89+
Distributor distributor.Config `yaml:"distributor"`
90+
Querier querier.Config `yaml:"querier"`
91+
IngesterClient client.Config `yaml:"ingester_client"`
92+
Ingester ingester.Config `yaml:"ingester"`
93+
Flusher flusher.Config `yaml:"flusher"`
94+
Storage storage.Config `yaml:"storage"`
95+
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
96+
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
97+
LimitsConfig validation.Limits `yaml:"limits"`
98+
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
99+
Worker querier_worker.Config `yaml:"frontend_worker"`
100+
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
101+
QueryRange queryrange.Config `yaml:"query_range"`
102+
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
103+
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
104+
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
105+
Compactor compactor.Config `yaml:"compactor"`
106+
StoreGateway storegateway.Config `yaml:"store_gateway"`
107+
PurgerConfig purger.Config `yaml:"purger"`
108+
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
106109

107110
Ruler ruler.Config `yaml:"ruler"`
108111
Configs configs.Config `yaml:"configs"`
@@ -149,6 +152,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
149152
c.Compactor.RegisterFlags(f)
150153
c.StoreGateway.RegisterFlags(f)
151154
c.PurgerConfig.RegisterFlags(f)
155+
c.TenantFederation.RegisterFlags(f)
152156

153157
c.Ruler.RegisterFlags(f)
154158
c.Configs.RegisterFlags(f)
@@ -304,6 +308,12 @@ func New(cfg Config) (*Cortex, error) {
304308
os.Exit(0)
305309
}
306310

311+
// Swap out the default resolver to support multiple tenant IDs separated by a '|'
312+
if cfg.TenantFederation.Enabled {
313+
util.WarnExperimentalUse("tenant-federation")
314+
tenant.WithDefaultResolver(tenant.NewMultiResolver())
315+
}
316+
307317
// Don't check auth header on TransferChunks, as we weren't originally
308318
// sending it and this could cause transfers to fail on update.
309319
cfg.API.HTTPAuthMiddleware = fakeauth.SetupAuthMiddleware(&cfg.Server, cfg.AuthEnabled,

pkg/cortex/modules.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cortexproject/cortex/pkg/ingester"
3131
"github.com/cortexproject/cortex/pkg/querier"
3232
"github.com/cortexproject/cortex/pkg/querier/queryrange"
33+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
3334
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3435
"github.com/cortexproject/cortex/pkg/ring"
3536
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
@@ -203,7 +204,15 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
203204
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)
204205

205206
// Create a querier queryable and PromQL engine
206-
t.QuerierQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer)
207+
var queryable prom_storage.SampleAndChunkQueryable
208+
queryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer)
209+
210+
// Enable merge querier if multi tenant query federation is enabled
211+
if t.Cfg.TenantFederation.Enabled {
212+
queryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(queryable))
213+
}
214+
215+
t.QuerierQueryable = queryable
207216

208217
// Register the default endpoints that are always enabled for the querier module
209218
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)

pkg/frontend/transport/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,11 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
146146
}
147147

148148
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
149-
userID, err := tenant.TenantID(r.Context())
149+
tenantIDs, err := tenant.TenantIDs(r.Context())
150150
if err != nil {
151151
return
152152
}
153+
userID := tenant.JoinTenantIDs(tenantIDs)
153154

154155
// Track stats.
155156
f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds())

0 commit comments

Comments
 (0)