Skip to content

Commit 7c922b2

Browse files
committed
Added integration tests on query-scheduler
Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent cc785f0 commit 7c922b2

File tree

6 files changed

+249
-117
lines changed

6 files changed

+249
-117
lines changed

integration/asserts.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
Ingester
2020
Querier
2121
QueryFrontend
22+
QueryScheduler
2223
TableManager
2324
AlertManager
2425
Ruler
@@ -29,15 +30,16 @@ const (
2930
var (
3031
// Service-specific metrics prefixes which shouldn't be used by any other service.
3132
serviceMetricsPrefixes = map[ServiceType][]string{
32-
Distributor: {},
33-
Ingester: {"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
34-
Querier: {"cortex_querier"},
35-
QueryFrontend: {"cortex_frontend", "cortex_query_frontend"},
36-
TableManager: {},
37-
AlertManager: {"cortex_alertmanager"},
38-
Ruler: {},
39-
StoreGateway: {"!cortex_storegateway_client", "cortex_storegateway"}, // The metrics prefix cortex_storegateway_client may be used by other components so we ignore it.
40-
Purger: {"cortex_purger"},
33+
Distributor: {},
34+
Ingester: {"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
35+
Querier: {"cortex_querier"},
36+
QueryFrontend: {"cortex_frontend", "cortex_query_frontend"},
37+
QueryScheduler: {"cortex_query_scheduler"},
38+
TableManager: {},
39+
AlertManager: {"cortex_alertmanager"},
40+
Ruler: {},
41+
StoreGateway: {"!cortex_storegateway_client", "cortex_storegateway"}, // The metrics prefix cortex_storegateway_client may be used by other components so we ignore it.
42+
Purger: {"cortex_purger"},
4143
}
4244

4345
// Blacklisted metrics prefixes across any Cortex service.
@@ -48,6 +50,10 @@ var (
4850
)
4951

5052
func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service *e2ecortex.CortexService) {
53+
if service == nil {
54+
return
55+
}
56+
5157
metrics, err := service.Metrics()
5258
require.NoError(t, err)
5359

@@ -67,7 +73,7 @@ func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service
6773
break
6874
}
6975

70-
assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", service.Name())
76+
assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s endpoint: %s", service.Name(), service.HTTPEndpoint())
7177
}
7278
}
7379
}

integration/backward_compatibility_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var (
2424
"quay.io/cortexproject/cortex:v1.1.0": preCortex14Flags,
2525
"quay.io/cortexproject/cortex:v1.2.0": preCortex14Flags,
2626
"quay.io/cortexproject/cortex:v1.3.0": preCortex14Flags,
27-
"quay.io/cortexproject/cortex:v1.4.0": nil,
27+
"quay.io/cortexproject/cortex:v1.4.0": preCortex16Flags,
2828
}
2929
)
3030

@@ -35,6 +35,17 @@ func preCortex14Flags(flags map[string]string) map[string]string {
3535
"-store-gateway.sharding-ring.store": "",
3636
"-store-gateway.sharding-ring.consul.hostname": "",
3737
"-store-gateway.sharding-ring.replication-factor": "",
38+
// Query-scheduler has been introduced in 1.6.0
39+
"-frontend.scheduler-dns-lookup-period": "",
40+
"-querier.scheduler-dns-lookup-period": "",
41+
})
42+
}
43+
44+
func preCortex16Flags(flags map[string]string) map[string]string {
45+
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
46+
// Query-scheduler has been introduced in 1.6.0
47+
"-frontend.scheduler-dns-lookup-period": "",
48+
"-querier.scheduler-dns-lookup-period": "",
3849
})
3950
}
4051

integration/e2ecortex/services.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
8282
"-querier.frontend-client.backoff-max-period": "100ms",
8383
"-querier.frontend-client.backoff-retries": "1",
8484
"-querier.worker-parallelism": "1",
85+
// Quickly detect query-scheduler when running it.
86+
"-querier.scheduler-dns-lookup-period": "1s",
8587
// Store-gateway ring backend.
8688
"-store-gateway.sharding-enabled": "true",
8789
"-store-gateway.sharding-ring.store": "consul",
@@ -204,6 +206,34 @@ func NewQueryFrontendWithConfigFile(name, configFile string, flags map[string]st
204206
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
205207
"-target": "query-frontend",
206208
"-log.level": "warn",
209+
// Quickly detect query-scheduler when running it.
210+
"-frontend.scheduler-dns-lookup-period": "1s",
211+
}, flags))...),
212+
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
213+
httpPort,
214+
grpcPort,
215+
)
216+
}
217+
218+
func NewQueryScheduler(name string, flags map[string]string, image string) *CortexService {
219+
return NewQuerySchedulerWithConfigFile(name, "", flags, image)
220+
}
221+
222+
func NewQuerySchedulerWithConfigFile(name, configFile string, flags map[string]string, image string) *CortexService {
223+
if configFile != "" {
224+
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
225+
}
226+
227+
if image == "" {
228+
image = GetDefaultImage()
229+
}
230+
231+
return NewCortexService(
232+
name,
233+
image,
234+
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
235+
"-target": "query-scheduler",
236+
"-log.level": "warn",
207237
}, flags))...),
208238
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
209239
httpPort,

integration/querier_sharding_test.go

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,41 @@ import (
1919
"github.com/cortexproject/cortex/integration/e2ecortex"
2020
)
2121

22-
func TestQuerierSharding(t *testing.T) {
23-
runQuerierShardingTest(t, true)
22+
type querierShardingTestConfig struct {
23+
shuffleShardingEnabled bool
24+
querySchedulerEnabled bool
2425
}
2526

26-
func TestQuerierNoSharding(t *testing.T) {
27-
runQuerierShardingTest(t, false)
27+
func TestQuerierShuffleShardingWithoutQueryScheduler(t *testing.T) {
28+
runQuerierShardingTest(t, querierShardingTestConfig{
29+
shuffleShardingEnabled: true,
30+
querySchedulerEnabled: false,
31+
})
32+
}
33+
34+
func TestQuerierShuffleShardingWithQueryScheduler(t *testing.T) {
35+
runQuerierShardingTest(t, querierShardingTestConfig{
36+
shuffleShardingEnabled: true,
37+
querySchedulerEnabled: true,
38+
})
2839
}
2940

30-
func runQuerierShardingTest(t *testing.T, sharding bool) {
31-
// Going to high starts hitting filedescriptor limit, since we run all queriers concurrently.
41+
func TestQuerierNoShardingWithoutQueryScheduler(t *testing.T) {
42+
runQuerierShardingTest(t, querierShardingTestConfig{
43+
shuffleShardingEnabled: false,
44+
querySchedulerEnabled: false,
45+
})
46+
}
47+
48+
func TestQuerierNoShardingWithQueryScheduler(t *testing.T) {
49+
runQuerierShardingTest(t, querierShardingTestConfig{
50+
shuffleShardingEnabled: false,
51+
querySchedulerEnabled: true,
52+
})
53+
}
54+
55+
func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
56+
// Going to high starts hitting file descriptor limit, since we run all queriers concurrently.
3257
const numQueries = 100
3358

3459
s, err := e2e.NewScenario(networkName)
@@ -50,24 +75,33 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
5075
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
5176
require.NoError(t, s.StartAndWaitReady(minio))
5277

53-
if sharding {
78+
if cfg.shuffleShardingEnabled {
5479
// Use only single querier for each user.
5580
flags["-frontend.max-queriers-per-tenant"] = "1"
5681
}
5782

58-
// Start Cortex components.
59-
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
60-
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), "", flags, "")
61-
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), "", flags, "")
83+
// Start the query-scheduler if enabled.
84+
var queryScheduler *e2ecortex.CortexService
85+
if cfg.querySchedulerEnabled {
86+
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
87+
require.NoError(t, s.StartAndWaitReady(queryScheduler))
88+
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
89+
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
90+
}
6291

92+
// Start the query-frontend.
93+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
6394
require.NoError(t, s.Start(queryFrontend))
6495

65-
querier1 := e2ecortex.NewQuerierWithConfigFile("querier-1", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
66-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
67-
}), "")
68-
querier2 := e2ecortex.NewQuerierWithConfigFile("querier-2", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
69-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
70-
}), "")
96+
if !cfg.querySchedulerEnabled {
97+
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
98+
}
99+
100+
// Start all other services.
101+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
102+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
103+
querier1 := e2ecortex.NewQuerier("querier-1", consul.NetworkHTTPEndpoint(), flags, "")
104+
querier2 := e2ecortex.NewQuerier("querier-2", consul.NetworkHTTPEndpoint(), flags, "")
71105

72106
require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
73107
require.NoError(t, s.WaitReady(queryFrontend))
@@ -99,8 +133,12 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
99133
require.NoError(t, err)
100134
}
101135

102-
// Wait until both workers connect to the query frontend
103-
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
136+
// Wait until both workers connect to the query-frontend or query-scheduler
137+
if cfg.querySchedulerEnabled {
138+
require.NoError(t, queryScheduler.WaitSumMetrics(e2e.Equals(2), "cortex_query_scheduler_connected_workers"))
139+
} else {
140+
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
141+
}
104142

105143
wg := sync.WaitGroup{}
106144

@@ -141,7 +179,7 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
141179

142180
require.Equal(t, float64(numQueries), total-2) // Remove 2 requests used for metrics initialization.
143181

144-
if sharding {
182+
if cfg.shuffleShardingEnabled {
145183
require.Equal(t, float64(numQueries), diff)
146184
} else {
147185
require.InDelta(t, 0, diff, numQueries*0.20) // Both queriers should have roughly equal number of requests, with possible delta.
@@ -153,4 +191,5 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
153191
assertServiceMetricsPrefixes(t, Querier, querier1)
154192
assertServiceMetricsPrefixes(t, Querier, querier2)
155193
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
194+
assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler)
156195
}

0 commit comments

Comments
 (0)