Skip to content

Add integration test on query-scheduler #3429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions integration/asserts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
Ingester
Querier
QueryFrontend
QueryScheduler
TableManager
AlertManager
Ruler
Expand All @@ -29,15 +30,16 @@ const (
var (
// Service-specific metrics prefixes which shouldn't be used by any other service.
serviceMetricsPrefixes = map[ServiceType][]string{
Distributor: {},
Ingester: {"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
Querier: {"cortex_querier"},
QueryFrontend: {"cortex_frontend", "cortex_query_frontend"},
TableManager: {},
AlertManager: {"cortex_alertmanager"},
Ruler: {},
StoreGateway: {"!cortex_storegateway_client", "cortex_storegateway"}, // The metrics prefix cortex_storegateway_client may be used by other components so we ignore it.
Purger: {"cortex_purger"},
Distributor: {},
Ingester: {"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
Querier: {"cortex_querier"},
QueryFrontend: {"cortex_frontend", "cortex_query_frontend"},
QueryScheduler: {"cortex_query_scheduler"},
TableManager: {},
AlertManager: {"cortex_alertmanager"},
Ruler: {},
StoreGateway: {"!cortex_storegateway_client", "cortex_storegateway"}, // The metrics prefix cortex_storegateway_client may be used by other components so we ignore it.
Purger: {"cortex_purger"},
}

// Blacklisted metrics prefixes across any Cortex service.
Expand All @@ -48,6 +50,10 @@ var (
)

func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service *e2ecortex.CortexService) {
if service == nil {
return
}

metrics, err := service.Metrics()
require.NoError(t, err)

Expand All @@ -67,7 +73,7 @@ func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service
break
}

assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", service.Name())
assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s endpoint: %s", service.Name(), service.HTTPEndpoint())
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
"quay.io/cortexproject/cortex:v1.1.0": preCortex14Flags,
"quay.io/cortexproject/cortex:v1.2.0": preCortex14Flags,
"quay.io/cortexproject/cortex:v1.3.0": preCortex14Flags,
"quay.io/cortexproject/cortex:v1.4.0": nil,
"quay.io/cortexproject/cortex:v1.4.0": preCortex16Flags,
}
)

Expand All @@ -35,6 +35,17 @@ func preCortex14Flags(flags map[string]string) map[string]string {
"-store-gateway.sharding-ring.store": "",
"-store-gateway.sharding-ring.consul.hostname": "",
"-store-gateway.sharding-ring.replication-factor": "",
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
"-querier.scheduler-dns-lookup-period": "",
})
}

func preCortex16Flags(flags map[string]string) map[string]string {
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
"-querier.scheduler-dns-lookup-period": "",
})
}

Expand Down
30 changes: 30 additions & 0 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
"-querier.frontend-client.backoff-max-period": "100ms",
"-querier.frontend-client.backoff-retries": "1",
"-querier.worker-parallelism": "1",
// Quickly detect query-scheduler when running it.
"-querier.scheduler-dns-lookup-period": "1s",
// Store-gateway ring backend.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
Expand Down Expand Up @@ -204,6 +206,34 @@ func NewQueryFrontendWithConfigFile(name, configFile string, flags map[string]st
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-target": "query-frontend",
"-log.level": "warn",
// Quickly detect query-scheduler when running it.
"-frontend.scheduler-dns-lookup-period": "1s",
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
grpcPort,
)
}

func NewQueryScheduler(name string, flags map[string]string, image string) *CortexService {
return NewQuerySchedulerWithConfigFile(name, "", flags, image)
}

func NewQuerySchedulerWithConfigFile(name, configFile string, flags map[string]string, image string) *CortexService {
if configFile != "" {
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
}

if image == "" {
image = GetDefaultImage()
}

return NewCortexService(
name,
image,
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
"-target": "query-scheduler",
"-log.level": "warn",
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
Expand Down
79 changes: 59 additions & 20 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,41 @@ import (
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestQuerierSharding(t *testing.T) {
runQuerierShardingTest(t, true)
type querierShardingTestConfig struct {
shuffleShardingEnabled bool
querySchedulerEnabled bool
}

func TestQuerierNoSharding(t *testing.T) {
runQuerierShardingTest(t, false)
func TestQuerierShuffleShardingWithoutQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: true,
querySchedulerEnabled: false,
})
}

func TestQuerierShuffleShardingWithQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: true,
querySchedulerEnabled: true,
})
}

func runQuerierShardingTest(t *testing.T, sharding bool) {
// Going to high starts hitting filedescriptor limit, since we run all queriers concurrently.
func TestQuerierNoShardingWithoutQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: false,
querySchedulerEnabled: false,
})
}

func TestQuerierNoShardingWithQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: false,
querySchedulerEnabled: true,
})
}

func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
// Going to high starts hitting file descriptor limit, since we run all queriers concurrently.
const numQueries = 100

s, err := e2e.NewScenario(networkName)
Expand All @@ -50,24 +75,33 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

if sharding {
if cfg.shuffleShardingEnabled {
// Use only single querier for each user.
flags["-frontend.max-queriers-per-tenant"] = "1"
}

// Start Cortex components.
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), "", flags, "")
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), "", flags, "")
// Start the query-scheduler if enabled.
var queryScheduler *e2ecortex.CortexService
if cfg.querySchedulerEnabled {
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
}

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

querier1 := e2ecortex.NewQuerierWithConfigFile("querier-1", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier2 := e2ecortex.NewQuerierWithConfigFile("querier-2", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
if !cfg.querySchedulerEnabled {
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
}

// Start all other services.
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
querier1 := e2ecortex.NewQuerier("querier-1", consul.NetworkHTTPEndpoint(), flags, "")
querier2 := e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "")

require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))
Expand Down Expand Up @@ -99,8 +133,12 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
require.NoError(t, err)
}

// Wait until both workers connect to the query frontend
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
// Wait until both workers connect to the query-frontend or query-scheduler
if cfg.querySchedulerEnabled {
require.NoError(t, queryScheduler.WaitSumMetrics(e2e.Equals(2), "cortex_query_scheduler_connected_querier_clients"))
} else {
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
}

wg := sync.WaitGroup{}

Expand Down Expand Up @@ -141,7 +179,7 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {

require.Equal(t, float64(numQueries), total-2) // Remove 2 requests used for metrics initialization.

if sharding {
if cfg.shuffleShardingEnabled {
require.Equal(t, float64(numQueries), diff)
} else {
require.InDelta(t, 0, diff, numQueries*0.20) // Both queriers should have roughly equal number of requests, with possible delta.
Expand All @@ -153,4 +191,5 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
assertServiceMetricsPrefixes(t, Querier, querier1)
assertServiceMetricsPrefixes(t, Querier, querier2)
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler)
}
Loading