-
Notifications
You must be signed in to change notification settings - Fork 820
Add multi tenant query federation #3250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4501902
f047968
64c3918
42663d1
7b94e53
b22a22c
52b53a6
9826281
9bfc3af
e51291b
0667242
b41a4df
e312687
54b9981
298c214
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
// +build requires_docker | ||
|
||
package integration | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/prompb" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/cortexproject/cortex/integration/e2e" | ||
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" | ||
e2edb "github.com/cortexproject/cortex/integration/e2e/db" | ||
"github.com/cortexproject/cortex/integration/e2ecortex" | ||
) | ||
|
||
type querierTenantFederationConfig struct { | ||
querySchedulerEnabled bool | ||
shuffleShardingEnabled bool | ||
} | ||
|
||
func TestQuerierTenantFederation(t *testing.T) { | ||
runQuerierTenantFederationTest(t, querierTenantFederationConfig{}) | ||
} | ||
|
||
func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) { | ||
runQuerierTenantFederationTest(t, querierTenantFederationConfig{ | ||
querySchedulerEnabled: true, | ||
}) | ||
} | ||
|
||
func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) { | ||
runQuerierTenantFederationTest(t, querierTenantFederationConfig{ | ||
shuffleShardingEnabled: true, | ||
}) | ||
} | ||
|
||
func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) { | ||
runQuerierTenantFederationTest(t, querierTenantFederationConfig{ | ||
querySchedulerEnabled: true, | ||
shuffleShardingEnabled: true, | ||
}) | ||
} | ||
|
||
func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) { | ||
const numUsers = 10 | ||
|
||
s, err := e2e.NewScenario(networkName) | ||
require.NoError(t, err) | ||
defer s.Close() | ||
|
||
memcached := e2ecache.NewMemcached() | ||
consul := e2edb.NewConsul() | ||
require.NoError(t, s.StartAndWaitReady(consul, memcached)) | ||
|
||
flags := mergeFlags(BlocksStorageFlags(), map[string]string{ | ||
"-querier.cache-results": "true", | ||
"-querier.split-queries-by-interval": "24h", | ||
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range | ||
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), | ||
"-tenant-federation.enabled": "true", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: We should be able to enable this feature for every e2e test and not have any failures. Doing so could provide a good smoke test for the feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jtlisi Not entirely sure if you meant to do this locally once or do you want to have that as part of the integration test run by CI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still a TODO, will look into this tomorrow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to commit any changes here. Just smoke testing that the tests pass if you enable it for globally on your local should be enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have done that today and it didn't not come up with any unexpected failure (apart from the |
||
}) | ||
|
||
// Start the query-scheduler if enabled. | ||
var queryScheduler *e2ecortex.CortexService | ||
if cfg.querySchedulerEnabled { | ||
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "") | ||
require.NoError(t, s.StartAndWaitReady(queryScheduler)) | ||
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() | ||
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() | ||
} | ||
|
||
if cfg.shuffleShardingEnabled { | ||
// Use only single querier for each user. | ||
flags["-frontend.max-queriers-per-tenant"] = "1" | ||
} | ||
|
||
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) | ||
require.NoError(t, s.StartAndWaitReady(minio)) | ||
|
||
// Start the query-frontend. | ||
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") | ||
require.NoError(t, s.Start(queryFrontend)) | ||
|
||
if !cfg.querySchedulerEnabled { | ||
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() | ||
} | ||
|
||
// Start all other services. | ||
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") | ||
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") | ||
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") | ||
|
||
var querier2 *e2ecortex.CortexService | ||
if cfg.shuffleShardingEnabled { | ||
querier2 = e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "") | ||
} | ||
|
||
require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor)) | ||
require.NoError(t, s.WaitReady(queryFrontend)) | ||
if cfg.shuffleShardingEnabled { | ||
require.NoError(t, s.StartAndWaitReady(querier2)) | ||
} | ||
|
||
// Wait until distributor and queriers have updated the ring. | ||
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) | ||
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) | ||
if cfg.shuffleShardingEnabled { | ||
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) | ||
} | ||
|
||
// Push a series for each user to Cortex. | ||
now := time.Now() | ||
expectedVectors := make([]model.Vector, numUsers) | ||
tenantIDs := make([]string, numUsers) | ||
|
||
for u := 0; u < numUsers; u++ { | ||
tenantIDs[u] = fmt.Sprintf("user-%d", u) | ||
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u]) | ||
require.NoError(t, err) | ||
|
||
var series []prompb.TimeSeries | ||
series, expectedVectors[u] = generateSeries("series_1", now) | ||
|
||
res, err := c.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 200, res.StatusCode) | ||
} | ||
|
||
// query all tenants | ||
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", strings.Join(tenantIDs, "|")) | ||
require.NoError(t, err) | ||
|
||
result, err := c.Query("series_1", now) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector)) | ||
|
||
// ensure a push to multiple tenants is failing | ||
series, _ := generateSeries("series_1", now) | ||
res, err := c.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 500, res.StatusCode) | ||
|
||
// check metric label values for total queries in the query frontend | ||
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers( | ||
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")), | ||
labels.MustNewMatcher(labels.MatchEqual, "op", "query")))) | ||
|
||
// check metric label values for query queue length in either query frontend or query scheduler | ||
queueComponent := queryFrontend | ||
queueMetricName := "cortex_query_frontend_queue_length" | ||
if cfg.querySchedulerEnabled { | ||
queueComponent = queryScheduler | ||
queueMetricName = "cortex_query_scheduler_queue_length" | ||
} | ||
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers( | ||
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|"))))) | ||
|
||
// TODO: check cache invalidation on tombstone cache gen increase | ||
// TODO: check fairness in queryfrontend | ||
simonswine marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vector { | ||
var v model.Vector | ||
for pos, tenantID := range tenantIDs { | ||
for _, r := range resultsPerTenant[pos] { | ||
var s model.Sample = *r | ||
s.Metric = r.Metric.Clone() | ||
s.Metric[model.LabelName("__tenant_id__")] = model.LabelValue(tenantID) | ||
v = append(v, &s) | ||
} | ||
} | ||
return v | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,13 +14,15 @@ import ( | |
func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.TombstonesLoader) middleware.Interface { | ||
return middleware.Func(func(next http.Handler) http.Handler { | ||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
userID, err := tenant.TenantID(r.Context()) | ||
tenantIDs, err := tenant.TenantIDs(r.Context()) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusUnauthorized) | ||
return | ||
} | ||
|
||
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userID) | ||
// len(tenantIDs) will always be > 0, as it otherwise errors | ||
// TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this means series deletions (chunks storage) don't correctly invalidate the cache. I think you should mention it in the doc at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you take a look at 891e692f79a1a65fe6a33961b6bb8f13121a5253 |
||
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0]) | ||
|
||
w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber) | ||
next.ServeHTTP(w, r) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import ( | |
"github.com/cortexproject/cortex/pkg/ingester" | ||
"github.com/cortexproject/cortex/pkg/querier" | ||
"github.com/cortexproject/cortex/pkg/querier/queryrange" | ||
"github.com/cortexproject/cortex/pkg/querier/tenantfederation" | ||
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" | ||
"github.com/cortexproject/cortex/pkg/ring" | ||
"github.com/cortexproject/cortex/pkg/ring/kv/codec" | ||
|
@@ -203,7 +204,15 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { | |
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) | ||
|
||
// Create a querier queryable and PromQL engine | ||
t.QuerierQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer) | ||
var queryable prom_storage.SampleAndChunkQueryable | ||
queryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer) | ||
|
||
// Enable merge querier if multi tenant query federation is enabled | ||
if t.Cfg.TenantFederation.Enabled { | ||
queryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(queryable)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May you confirm we don't need to do this for the ruler as well, because the ruler will always run on a single tenant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The proposal lists that under future work , right now I think we should not enable that just yet. |
||
} | ||
|
||
t.QuerierQueryable = queryable | ||
|
||
// Register the default endpoints that are always enabled for the querier module | ||
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why all? Let's be specific to avoid confusing users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it needs to be enabled on all Cortex services, because otherwise e.g. the write path would interpret multiple tenants as a single new tenant. Once it is enabled the write path fails as it only supports a single tenant. I have aligned the CHANGELOG with the flags description.