Skip to content

Commit 1189bab

Browse files
committed
Allow shard sizes to be percent of instances
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
1 parent 6669540 commit 1189bab

15 files changed

+113
-40
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
1414
* [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356
1515
* [FEATURE] Querier: Log query stats when querying store gateway. #5376
16+
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
1617
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
1718
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
1819
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2938,13 +2938,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
29382938

29392939
# Maximum number of queriers that can handle requests for a single tenant. If
29402940
# set to 0 or value higher than number of available queriers, *all* queriers
2941-
# will handle requests for the tenant. Each frontend (or query-scheduler, if
2942-
# used) will select the same set of queriers for the same tenant (given that all
2943-
# queriers are connected to all frontends / query-schedulers). This option only
2944-
# works with queriers connecting to the query-frontend / query-scheduler, not
2945-
# when using downstream URL.
2941+
# will handle requests for the tenant. If the value is < 1, it will be treated
2942+
# as a percentage and the gets a percentage of the total queriers. Each frontend
2943+
# (or query-scheduler, if used) will select the same set of queriers for the
2944+
# same tenant (given that all queriers are connected to all frontends /
2945+
# query-schedulers). This option only works with queriers connecting to the
2946+
# query-frontend / query-scheduler, not when using downstream URL.
29462947
# CLI flag: -frontend.max-queriers-per-tenant
2947-
[max_queriers_per_tenant: <int> | default = 0]
2948+
[max_queriers_per_tenant: <float> | default = 0]
29482949

29492950
# Maximum number of outstanding requests per tenant per request queue (either
29502951
# query frontend or query scheduler); requests beyond this error with HTTP 429.
@@ -2973,9 +2974,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
29732974
# The default tenant's shard size when the shuffle-sharding strategy is used.
29742975
# Must be set when the store-gateway sharding is enabled with the
29752976
# shuffle-sharding strategy. When this setting is specified in the per-tenant
2976-
# overrides, a value of 0 disables shuffle sharding for the tenant.
2977+
# overrides, a value of 0 disables shuffle sharding for the tenant. If the value
2978+
# is < 1 the shard size will be a percentage of the total store-gateways.
29772979
# CLI flag: -store-gateway.tenant-shard-size
2978-
[store_gateway_tenant_shard_size: <int> | default = 0]
2980+
[store_gateway_tenant_shard_size: <float> | default = 0]
29792981

29802982
# The maximum number of data bytes to download per gRPC request in Store
29812983
# Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.

pkg/frontend/v1/frontend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4343

4444
type Limits interface {
4545
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
46-
MaxQueriersPerUser(user string) int
46+
MaxQueriersPerUser(user string) float64
4747

4848
queue.Limits
4949
}
5050

5151
// MockLimits implements the Limits interface. Used in tests only.
5252
type MockLimits struct {
53-
Queriers int
53+
Queriers float64
5454
queue.MockLimits
5555
}
5656

57-
func (l MockLimits) MaxQueriersPerUser(_ string) int {
57+
func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
5858
return l.Queriers
5959
}
6060

@@ -338,7 +338,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
338338
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")
339339

340340
// aggregate the max queriers limit in the case of a multi tenant query
341-
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
341+
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
342342

343343
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
344344
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)

pkg/querier/blocks_store_queryable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type BlocksStoreLimits interface {
9797
bucket.TenantConfigProvider
9898

9999
MaxChunksPerQueryFromStore(userID string) int
100-
StoreGatewayTenantShardSize(userID string) int
100+
StoreGatewayTenantShardSize(userID string) float64
101101
}
102102

103103
type blocksStoreQueryableMetrics struct {

pkg/querier/blocks_store_queryable_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,14 +1558,14 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {
15581558

15591559
type blocksStoreLimitsMock struct {
15601560
maxChunksPerQuery int
1561-
storeGatewayTenantShardSize int
1561+
storeGatewayTenantShardSize float64
15621562
}
15631563

15641564
func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int {
15651565
return m.maxChunksPerQuery
15661566
}
15671567

1568-
func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) int {
1568+
func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
15691569
return m.storeGatewayTenantShardSize
15701570
}
15711571

pkg/querier/blocks_store_replicated_set_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
4343

4444
tests := map[string]struct {
4545
shardingStrategy string
46-
tenantShardSize int
46+
tenantShardSize float64
4747
replicationFactor int
4848
setup func(*ring.Desc)
4949
queryBlocks []ulid.ULID

pkg/scheduler/queue/queue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/prometheus/client_golang/prometheus/promauto"
1111
"go.uber.org/atomic"
1212

13+
"github.com/cortexproject/cortex/pkg/util"
1314
"github.com/cortexproject/cortex/pkg/util/services"
1415
)
1516

@@ -86,15 +87,16 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
8687
// between calls.
8788
//
8889
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
89-
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
90+
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers float64, successFn func()) error {
9091
q.mtx.Lock()
9192
defer q.mtx.Unlock()
9293

9394
if q.stopped {
9495
return ErrStopped
9596
}
9697

97-
queue := q.queues.getOrAddQueue(userID, maxQueriers)
98+
shardSize := util.DynamicShardSize(maxQueriers, len(q.queues.queriers))
99+
queue := q.queues.getOrAddQueue(userID, shardSize)
98100
if queue == nil {
99101
// This can only happen if userID is "".
100102
return errors.New("no queue found")

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
143143
// Limits needed for the Query Scheduler - interface used for decoupling.
144144
type Limits interface {
145145
// MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
146-
MaxQueriersPerUser(user string) int
146+
MaxQueriersPerUser(user string) float64
147147

148148
queue.Limits
149149
}
@@ -307,7 +307,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
307307
if err != nil {
308308
return err
309309
}
310-
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
310+
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
311311

312312
s.activeUsers.UpdateUserTimestamp(userID, now)
313313
return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() {

pkg/storegateway/gateway_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
241241

242242
tests := map[string]struct {
243243
shardingStrategy string
244-
tenantShardSize int // Used only when the sharding strategy is shuffle-sharding.
244+
tenantShardSize float64 // Used only when the sharding strategy is shuffle-sharding.
245245
replicationFactor int
246246
numGateways int
247247
expectedBlocksLoaded int
@@ -361,8 +361,8 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
361361
assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered"))
362362

363363
if testData.shardingStrategy == util.ShardingStrategyShuffle {
364-
assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
365-
assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
364+
assert.Equal(t, float64(int(testData.tenantShardSize)*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
365+
assert.Equal(t, float64(int(testData.tenantShardSize)*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
366366
} else {
367367
assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
368368
assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))

pkg/storegateway/sharding_strategy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/cortexproject/cortex/pkg/ring"
1414
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
15+
"github.com/cortexproject/cortex/pkg/util"
1516
)
1617

1718
const (
@@ -32,7 +33,7 @@ type ShardingStrategy interface {
3233
// ShardingLimits is the interface that should be implemented by the limits provider,
3334
// limiting the scope of the limits to the ones required by sharding strategies.
3435
type ShardingLimits interface {
35-
StoreGatewayTenantShardSize(userID string) int
36+
StoreGatewayTenantShardSize(userID string) float64
3637
}
3738

3839
// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
@@ -173,7 +174,7 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
173174
// GetShuffleShardingSubring returns the subring to be used for a given user. This function
174175
// should be used both by store-gateway and querier in order to guarantee the same logic is used.
175176
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing {
176-
shardSize := limits.StoreGatewayTenantShardSize(userID)
177+
shardSize := util.DynamicShardSize(limits.StoreGatewayTenantShardSize(userID), ring.InstancesCount())
177178

178179
// A shard size of 0 means shuffle sharding is disabled for this specific user,
179180
// so we just return the full ring so that blocks will be sharded across all store-gateways.

0 commit comments

Comments
 (0)