Skip to content

Commit 51ad6df

Browse files
authored
Make max outstanding queries per tenant config in limits (#4991)
* make max outstanding req per tenant config Signed-off-by: Ben Ye <benye@amazon.com> * add changelog Signed-off-by: Ben Ye <benye@amazon.com> Signed-off-by: Ben Ye <benye@amazon.com>
1 parent a5476bb commit 51ad6df

File tree

13 files changed

+95
-49
lines changed

13 files changed

+95
-49
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44
* [ENHANCEMENT] Update Go version to 1.19.3. #4988
55
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
6+
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
67
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
78
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
89
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

docs/configuration/config-file-reference.md

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,12 @@ runtime_config:
172172
[memberlist: <memberlist_config>]
173173

174174
query_scheduler:
175-
# Maximum number of outstanding requests per tenant per query-scheduler.
176-
# In-flight requests above this limit will fail with HTTP response status code
177-
# 429.
175+
# Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and
176+
# will be removed in v1.17.0: Maximum number of outstanding requests per
177+
# tenant per query-scheduler. In-flight requests above this limit will fail
178+
# with HTTP response status code 429.
178179
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
179-
[max_outstanding_requests_per_tenant: <int> | default = 100]
180+
[max_outstanding_requests_per_tenant: <int> | default = 0]
180181

181182
# If a querier disconnects without sending notification about graceful
182183
# shutdown, the query-scheduler will keep the querier in the tenant's shard
@@ -916,10 +917,11 @@ The `query_frontend_config` configures the Cortex query-frontend.
916917
# CLI flag: -frontend.query-stats-enabled
917918
[query_stats_enabled: <boolean> | default = false]
918919
919-
# Maximum number of outstanding requests per tenant per frontend; requests
920-
# beyond this error with HTTP 429.
920+
# Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will
921+
# be removed in v1.17.0: Maximum number of outstanding requests per tenant per
922+
# frontend; requests beyond this error with HTTP 429.
921923
# CLI flag: -querier.max-outstanding-requests-per-tenant
922-
[max_outstanding_per_tenant: <int> | default = 100]
924+
[max_outstanding_per_tenant: <int> | default = 0]
923925
924926
# If a querier disconnects without sending notification about graceful shutdown,
925927
# the query-frontend will keep the querier in the tenant's shard until the
@@ -2724,6 +2726,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27242726
# CLI flag: -frontend.max-queriers-per-tenant
27252727
[max_queriers_per_tenant: <int> | default = 0]
27262728

2729+
# Maximum number of outstanding requests per tenant per request queue (either
2730+
# query frontend or query scheduler); requests beyond this error with HTTP 429.
2731+
# CLI flag: -frontend.max-outstanding-requests-per-tenant
2732+
[max_outstanding_requests_per_tenant: <int> | default = 100]
2733+
27272734
# Duration to delay the evaluation of rules to ensure the underlying metrics
27282735
# have been pushed to Cortex.
27292736
# CLI flag: -ruler.evaluation-delay-duration

pkg/frontend/frontend_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"google.golang.org/grpc"
2626

2727
"github.com/cortexproject/cortex/pkg/frontend/transport"
28+
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
2829
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
2930
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3031
"github.com/cortexproject/cortex/pkg/util/concurrency"
@@ -253,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
253254
httpListen, err := net.Listen("tcp", "localhost:0")
254255
require.NoError(t, err)
255256

256-
rt, v1, v2, err := InitFrontend(config, limits{}, 0, logger, nil)
257+
rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil)
257258
require.NoError(t, err)
258259
require.NotNil(t, rt)
259260
// v1 will be nil if DownstreamURL is defined.
@@ -306,11 +307,3 @@ func defaultFrontendConfig() CombinedFrontendConfig {
306307
flagext.DefaultValues(&config.FrontendV2)
307308
return config
308309
}
309-
310-
type limits struct {
311-
queriers int
312-
}
313-
314-
func (l limits) MaxQueriersPerUser(_ string) int {
315-
return l.queriers
316-
}

pkg/frontend/v1/frontend.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,25 @@ type Config struct {
3737

3838
// RegisterFlags adds the flags required to config this to the given FlagSet.
3939
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
40-
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
40+
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 0, "Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will be removed in v1.17.0: Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
4141
f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
4242
}
4343

4444
type Limits interface {
4545
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
4646
MaxQueriersPerUser(user string) int
47+
48+
queue.Limits
49+
}
50+
51+
// MockLimits implements the Limits interface. Used in tests only.
52+
type MockLimits struct {
53+
Queriers int
54+
queue.MockLimits
55+
}
56+
57+
func (l MockLimits) MaxQueriersPerUser(_ string) int {
58+
return l.Queriers
4759
}
4860

4961
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
@@ -100,7 +112,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
100112
}),
101113
}
102114

103-
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
115+
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits)
104116
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)
105117

106118
var err error

pkg/frontend/v1/frontend_test.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func TestFrontendPropagateTrace(t *testing.T) {
116116
}
117117

118118
func TestFrontendCheckReady(t *testing.T) {
119+
limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
119120
for _, tt := range []struct {
120121
name string
121122
connectedClients int
@@ -131,6 +132,7 @@ func TestFrontendCheckReady(t *testing.T) {
131132
requestQueue: queue.NewRequestQueue(5, 0,
132133
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
133134
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
135+
limits,
134136
),
135137
}
136138
for i := 0; i < tt.connectedClients; i++ {
@@ -243,7 +245,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
243245
httpListen, err := net.Listen("tcp", "localhost:0")
244246
require.NoError(t, err)
245247

246-
v1, err := New(config, limits{}, logger, reg)
248+
limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
249+
v1, err := New(config, limits, logger, reg)
247250
require.NoError(t, err)
248251
require.NotNil(t, v1)
249252
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
@@ -292,11 +295,3 @@ func defaultFrontendConfig() Config {
292295
flagext.DefaultValues(&config)
293296
return config
294297
}
295-
296-
type limits struct {
297-
queriers int
298-
}
299-
300-
func (l limits) MaxQueriersPerUser(_ string) int {
301-
return l.queriers
302-
}

pkg/frontend/v1/queue_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ import (
1616
"google.golang.org/grpc/metadata"
1717

1818
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
19+
"github.com/cortexproject/cortex/pkg/scheduler/queue"
1920
"github.com/cortexproject/cortex/pkg/util/flagext"
2021
"github.com/cortexproject/cortex/pkg/util/services"
2122
)
2223

2324
func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
2425
logger := log.NewNopLogger()
2526

26-
frontend, err := New(config, limits{queriers: 3}, logger, nil)
27+
limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}}
28+
frontend, err := New(config, limits, logger, nil)
2729
require.NoError(t, err)
2830

2931
t.Cleanup(func() {

pkg/scheduler/queue/queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ type RequestQueue struct {
6161
discardedRequests *prometheus.CounterVec // Per user.
6262
}
6363

64-
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
64+
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits) *RequestQueue {
6565
q := &RequestQueue{
66-
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
66+
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
6767
connectedQuerierWorkers: atomic.NewInt32(0),
6868
queueLength: queueLength,
6969
discardedRequests: discardedRequests,

pkg/scheduler/queue/queue_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
2626
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
2727
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
2828
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
29+
MockLimits{MaxOutstanding: 100},
2930
)
3031
queues = append(queues, queue)
3132

@@ -83,6 +84,7 @@ func BenchmarkQueueRequest(b *testing.B) {
8384
q := NewRequestQueue(maxOutstandingPerTenant, 0,
8485
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
8586
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
87+
MockLimits{MaxOutstanding: 100},
8688
)
8789

8890
for ix := 0; ix < queriers; ix++ {
@@ -115,7 +117,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
115117

116118
queue := NewRequestQueue(1, forgetDelay,
117119
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
118-
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))
120+
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
121+
MockLimits{MaxOutstanding: 100},
122+
)
119123

120124
// Start the queue service.
121125
ctx := context.Background()

pkg/scheduler/queue/user_queues.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import (
88
"github.com/cortexproject/cortex/pkg/util"
99
)
1010

11+
// Limits needed for the Query Scheduler - interface used for decoupling.
12+
type Limits interface {
13+
// MaxOutstandingPerTenant returns the limit to the maximum number
14+
// of outstanding requests per tenant per request queue.
15+
MaxOutstandingPerTenant(user string) int
16+
}
17+
1118
// querier holds information about a querier registered in the queue.
1219
type querier struct {
1320
// Number of active connections.
@@ -41,6 +48,8 @@ type queues struct {
4148

4249
// Sorted list of querier names, used when creating per-user shard.
4350
sortedQueriers []string
51+
52+
limits Limits
4453
}
4554

4655
type userQueue struct {
@@ -59,14 +68,15 @@ type userQueue struct {
5968
index int
6069
}
6170

62-
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues {
71+
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues {
6372
return &queues{
6473
userQueues: map[string]*userQueue{},
6574
users: nil,
6675
maxUserQueueSize: maxUserQueueSize,
6776
forgetDelay: forgetDelay,
6877
queriers: map[string]*querier{},
6978
sortedQueriers: nil,
79+
limits: limits,
7080
}
7181
}
7282

@@ -106,8 +116,14 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request {
106116
uq := q.userQueues[userID]
107117

108118
if uq == nil {
119+
queueSize := q.limits.MaxOutstandingPerTenant(userID)
120+
// 0 is the default value of the flag. If the old flag is set
121+
// then we use its value for compatibility reason.
122+
if q.maxUserQueueSize != 0 {
123+
queueSize = q.maxUserQueueSize
124+
}
109125
uq = &userQueue{
110-
ch: make(chan Request, q.maxUserQueueSize),
126+
ch: make(chan Request, queueSize),
111127
seed: util.ShuffleShardSeed(userID, ""),
112128
index: -1,
113129
}
@@ -303,3 +319,12 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri
303319

304320
return result
305321
}
322+
323+
// MockLimits implements the Limits interface. Used in tests only.
324+
type MockLimits struct {
325+
MaxOutstanding int
326+
}
327+
328+
func (l MockLimits) MaxOutstandingPerTenant(_ string) int {
329+
return l.MaxOutstanding
330+
}

pkg/scheduler/queue/user_queues_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
func TestQueues(t *testing.T) {
16-
uq := newUserQueues(0, 0)
16+
uq := newUserQueues(0, 0, MockLimits{})
1717
assert.NotNil(t, uq)
1818
assert.NoError(t, isConsistent(uq))
1919

@@ -68,7 +68,7 @@ func TestQueues(t *testing.T) {
6868
}
6969

7070
func TestQueuesWithQueriers(t *testing.T) {
71-
uq := newUserQueues(0, 0)
71+
uq := newUserQueues(0, 0, MockLimits{})
7272
assert.NotNil(t, uq)
7373
assert.NoError(t, isConsistent(uq))
7474

@@ -145,7 +145,7 @@ func TestQueuesConsistency(t *testing.T) {
145145

146146
for testName, testData := range tests {
147147
t.Run(testName, func(t *testing.T) {
148-
uq := newUserQueues(0, testData.forgetDelay)
148+
uq := newUserQueues(0, testData.forgetDelay, MockLimits{})
149149
assert.NotNil(t, uq)
150150
assert.NoError(t, isConsistent(uq))
151151

@@ -194,7 +194,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
194194
)
195195

196196
now := time.Now()
197-
uq := newUserQueues(0, forgetDelay)
197+
uq := newUserQueues(0, forgetDelay, MockLimits{})
198198
assert.NotNil(t, uq)
199199
assert.NoError(t, isConsistent(uq))
200200

@@ -286,7 +286,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
286286
)
287287

288288
now := time.Now()
289-
uq := newUserQueues(0, forgetDelay)
289+
uq := newUserQueues(0, forgetDelay, MockLimits{})
290290
assert.NotNil(t, uq)
291291
assert.NoError(t, isConsistent(uq))
292292

0 commit comments

Comments
 (0)