Skip to content

Add multi tenant query federation #3250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Querier: it's not required to set `-frontend.query-stats-enabled=true` in the querier anymore to enable query statistics logging in the query-frontend. The flag is now required to be configured only in the query-frontend and it will be propagated to the queriers. #3595
* [CHANGE] Blocks storage: compactor is now required when running a Cortex cluster with the blocks storage, because it also keeps the bucket index updated. #3583
* [CHANGE] Blocks storage: block deletion marks are now stored in a per-tenant global markers/ location too, other than within the block location. The compactor, at startup, will copy deletion marks from the block location to the global location. This migration is required only once, so you can safely disable it via `-compactor.block-deletion-marks-migration-enabled=false` once new compactor has successfully started once in your cluster. #3583
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all? Let's be specific to avoid confusing users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it needs to be enabled on all Cortex services, because otherwise e.g. the write path would interpret multiple tenants as a single new tenant. Once it is enabled the write path fails as it only supports a single tenant. I have aligned the CHANGELOG with the flags description.

* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers and store-gateways. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier. When enabled, the querier will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics have been added: #3614
* `cortex_bucket_index_loads_total`
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ lint:
GOFLAGS="-tags=requires_docker" faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\
golang.org/x/net/context=context,\
sync/atomic=go.uber.org/atomic,\
github.com/weaveworks/common/user.{ExtractOrgID}=github.com/cortexproject/cortex/pkg/tenant.{TenantID},\
github.com/prometheus/client_golang/prometheus.{MultiError}=github.com/prometheus/prometheus/tsdb/errors.{NewMulti},\
github.com/weaveworks/common/user.{ExtractOrgID}=github.com/cortexproject/cortex/pkg/tenant.{TenantID,TenantIDs},\
github.com/weaveworks/common/user.{ExtractOrgIDFromHTTPRequest}=github.com/cortexproject/cortex/pkg/tenant.{ExtractTenantIDFromHTTPRequest}" ./pkg/... ./cmd/... ./tools/... ./integration/...

# Ensure clean pkg structure.
Expand All @@ -172,6 +173,14 @@ lint:
faillint -paths "github.com/cortexproject/cortex/pkg/querier/..." ./pkg/scheduler/...
faillint -paths "github.com/cortexproject/cortex/pkg/storage/tsdb/..." ./pkg/storage/bucket/...

# Ensure the query path is supporting multiple tenants
faillint -paths "\
github.com/cortexproject/cortex/pkg/tenant.{TenantID}=github.com/cortexproject/cortex/pkg/tenant.{TenantIDs}" \
./pkg/scheduler/... \
./pkg/frontend/... \
./pkg/querier/tenantfederation/... \
./pkg/querier/queryrange/...

# Validate Kubernetes spec files. Requires:
# https://kubeval.instrumenta.dev
kubeval ./k8s/*
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ api:
# The purger_config configures the purger which takes care of delete requests
[purger: <purger_config>]

tenant_federation:
# If enabled on all Cortex services, queries can be federated across multiple
# tenants. The tenant IDs involved need to be specified separated by a `|`
# character in the `X-Scope-OrgID` header (experimental).
# CLI flag: -tenant-federation.enabled
[enabled: <boolean> | default = false]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ Currently experimental features are:
- Blocks storage bucket index
- The bucket index support in the querier (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
- Querier: tenant federation
4 changes: 4 additions & 0 deletions docs/guides/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ The Cortex chunks storage doesn't support queries without a metric name, like `c
## Query series and labels

When running queries to the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints, query's time range is ignored and the data is always fetched from ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set.

## Tenant federation

When tenant federation is enabled on a Cortex cluster, result caching is disabled for queries spanning more than a single tenant. Result caching is planned to be implemented before general availability of this feature.
181 changes: 181 additions & 0 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// +build requires_docker

package integration

import (
"fmt"
"strings"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

type querierTenantFederationConfig struct {
querySchedulerEnabled bool
shuffleShardingEnabled bool
}

func TestQuerierTenantFederation(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{})
}

func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
})
}

func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
shuffleShardingEnabled: true,
})
}

func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
shuffleShardingEnabled: true,
})
}

func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) {
const numUsers = 10

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: We should be able to enable this feature for every e2e test and not have any failures. Doing so could provide a good smoke test for the feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtlisi Not entirely sure if you meant to do this locally once or do you want to have that as part of the integration test run by CI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a TODO, will look into this tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to commit any changes here. Just smoke testing that the tests pass if you enable it for globally on your local should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done that today and it didn't not come up with any unexpected failure (apart from the Backward_compatibility tests, which is expected as the feature flag is not recognised by them)

})

// Start the query-scheduler if enabled.
var queryScheduler *e2ecortex.CortexService
if cfg.querySchedulerEnabled {
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
}

if cfg.shuffleShardingEnabled {
// Use only single querier for each user.
flags["-frontend.max-queriers-per-tenant"] = "1"
}

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

if !cfg.querySchedulerEnabled {
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
}

// Start all other services.
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")

var querier2 *e2ecortex.CortexService
if cfg.shuffleShardingEnabled {
querier2 = e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "")
}

require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))
if cfg.shuffleShardingEnabled {
require.NoError(t, s.StartAndWaitReady(querier2))
}

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
if cfg.shuffleShardingEnabled {
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
}

// Push a series for each user to Cortex.
now := time.Now()
expectedVectors := make([]model.Vector, numUsers)
tenantIDs := make([]string, numUsers)

for u := 0; u < numUsers; u++ {
tenantIDs[u] = fmt.Sprintf("user-%d", u)
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u])
require.NoError(t, err)

var series []prompb.TimeSeries
series, expectedVectors[u] = generateSeries("series_1", now)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// query all tenants
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", strings.Join(tenantIDs, "|"))
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// ensure a push to multiple tenants is failing
series, _ := generateSeries("series_1", now)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 500, res.StatusCode)

// check metric label values for total queries in the query frontend
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")),
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))

// check metric label values for query queue length in either query frontend or query scheduler
queueComponent := queryFrontend
queueMetricName := "cortex_query_frontend_queue_length"
if cfg.querySchedulerEnabled {
queueComponent = queryScheduler
queueMetricName = "cortex_query_scheduler_queue_length"
}
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")))))

// TODO: check cache invalidation on tombstone cache gen increase
// TODO: check fairness in queryfrontend
}

func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vector {
var v model.Vector
for pos, tenantID := range tenantIDs {
for _, r := range resultsPerTenant[pos] {
var s model.Sample = *r
s.Metric = r.Metric.Clone()
s.Metric[model.LabelName("__tenant_id__")] = model.LabelValue(tenantID)
v = append(v, &s)
}
}
return v
}
6 changes: 4 additions & 2 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.TombstonesLoader) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userID)
// len(tenantIDs) will always be > 0, as it otherwise errors
// TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this means series deletions (chunks storage) don't correctly invalidate the cache. I think you should mention it in the doc at guides/limitations.md.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take a look at 891e692f79a1a65fe6a33961b6bb8f13121a5253

cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0])

w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber)
next.ServeHTTP(w, r)
Expand Down
52 changes: 31 additions & 21 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/fakeauth"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -82,27 +84,28 @@ type Config struct {
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
Storage storage.Config `yaml:"storage"`
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
LimitsConfig validation.Limits `yaml:"limits"`
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
QueryRange queryrange.Config `yaml:"query_range"`
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
PurgerConfig purger.Config `yaml:"purger"`
API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
Storage storage.Config `yaml:"storage"`
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
LimitsConfig validation.Limits `yaml:"limits"`
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
QueryRange queryrange.Config `yaml:"query_range"`
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
PurgerConfig purger.Config `yaml:"purger"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`

Ruler ruler.Config `yaml:"ruler"`
Configs configs.Config `yaml:"configs"`
Expand Down Expand Up @@ -149,6 +152,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Compactor.RegisterFlags(f)
c.StoreGateway.RegisterFlags(f)
c.PurgerConfig.RegisterFlags(f)
c.TenantFederation.RegisterFlags(f)

c.Ruler.RegisterFlags(f)
c.Configs.RegisterFlags(f)
Expand Down Expand Up @@ -304,6 +308,12 @@ func New(cfg Config) (*Cortex, error) {
os.Exit(0)
}

// Swap out the default resolver to support multiple tenant IDs separated by a '|'
if cfg.TenantFederation.Enabled {
util.WarnExperimentalUse("tenant-federation")
tenant.WithDefaultResolver(tenant.NewMultiResolver())
}

// Don't check auth header on TransferChunks, as we weren't originally
// sending it and this could cause transfers to fail on update.
cfg.API.HTTPAuthMiddleware = fakeauth.SetupAuthMiddleware(&cfg.Server, cfg.AuthEnabled,
Expand Down
11 changes: 10 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
Expand Down Expand Up @@ -203,7 +204,15 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer)
var queryable prom_storage.SampleAndChunkQueryable
queryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer)

// Enable merge querier if multi tenant query federation is enabled
if t.Cfg.TenantFederation.Enabled {
queryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(queryable))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May you confirm we don't need to do this for the ruler as well, because the ruler will always run on a single tenant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposal lists that under future work , right now I think we should not enable that just yet.

}

t.QuerierQueryable = queryable

// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds())
Expand Down
Loading