Skip to content

Commit 3783b39

Browse files
committed
Replaced Subring() with ShuffleShard()
Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent ea4c258 commit 3783b39

File tree

9 files changed

+74
-57
lines changed

9 files changed

+74
-57
lines changed

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ ha_tracker:
393393
# CLI flag: -distributor.extra-query-delay
394394
[extra_queue_delay: <duration> | default = 0s]
395395
396+
# The sharding strategy to use. Supported values are: default, shuffle-sharding.
397+
# CLI flag: -distributor.sharding-strategy
398+
[sharding_strategy: <string> | default = "default"]
399+
396400
# Distribute samples based on all labels, as opposed to solely by user and
397401
# metric name.
398402
# CLI flag: -distributor.shard-by-all-labels
@@ -2714,9 +2718,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27142718
# CLI flag: -validation.enforce-metric-name
27152719
[enforce_metric_name: <boolean> | default = true]
27162720
2717-
# Per-user subring to shard metrics to ingesters. 0 is disabled.
2718-
# CLI flag: -experimental.distributor.user-subring-size
2719-
[user_subring_size: <int> | default = 0]
2721+
# The default tenant's shard size when the shuffle-sharding strategy is used.
2722+
# Must be set both on ingesters and distributors. When this setting is specified
2723+
# in the per-tenant overrides, a value of 0 disables shuffle sharding for the
2724+
# tenant.
2725+
# CLI flag: -distribution.ingestion-tenant-shard-size
2726+
[ingestion_tenant_shard_size: <int> | default = 0]
27202727
27212728
# The maximum number of series for which a query can fetch samples from each
27222729
# ingester. This limit is enforced only in the ingesters (when querying samples

pkg/cortex/cortex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (c *Config) Validate(log log.Logger) error {
172172
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
173173
return errors.Wrap(err, "invalid limits config")
174174
}
175-
if err := c.Distributor.Validate(); err != nil {
175+
if err := c.Distributor.Validate(c.LimitsConfig); err != nil {
176176
return errors.Wrap(err, "invalid distributor config")
177177
}
178178
if err := c.Querier.Validate(); err != nil {

pkg/distributor/distributor.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package distributor
33
import (
44
"context"
55
"flag"
6+
"fmt"
67
"net/http"
78
"sort"
89
"strings"
@@ -104,11 +105,21 @@ var (
104105
Help: "Unix timestamp of latest received sample per user.",
105106
}, []string{"user"})
106107
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
108+
109+
supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle}
110+
111+
// Validation errors.
112+
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
113+
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
107114
)
108115

109116
const (
110117
typeSamples = "samples"
111118
typeMetadata = "metadata"
119+
120+
// Supported sharding strategies.
121+
ShardingStrategyDefault = "default"
122+
ShardingStrategyShuffle = "shuffle-sharding"
112123
)
113124

114125
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -147,7 +158,8 @@ type Config struct {
147158
RemoteTimeout time.Duration `yaml:"remote_timeout"`
148159
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
149160

150-
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
161+
ShardingStrategy string `yaml:"sharding_strategy"`
162+
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
151163

152164
// Distributors ring
153165
DistributorRing RingConfig `yaml:"ring"`
@@ -170,10 +182,19 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
170182
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
171183
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
172184
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
185+
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
173186
}
174187

175188
// Validate config and returns error on failure
176-
func (cfg *Config) Validate() error {
189+
func (cfg *Config) Validate(limits validation.Limits) error {
190+
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
191+
return errInvalidShardingStrategy
192+
}
193+
194+
if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.IngestionTenantShardSize <= 0 {
195+
return errInvalidTenantShardSize
196+
}
197+
177198
return cfg.HATrackerConfig.Validate()
178199
}
179200

@@ -508,13 +529,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
508529
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
509530
}
510531

511-
var subRing ring.ReadRing
512-
subRing = d.ingestersRing
532+
subRing := d.ingestersRing.(ring.ReadRing)
513533

514-
// Obtain a subring if required
515-
if size := d.limits.SubringSize(userID); size > 0 {
516-
h := client.HashAdd32a(client.HashNew32a(), userID)
517-
subRing = d.ingestersRing.Subring(h, size)
534+
// Obtain a subring if required.
535+
if d.cfg.ShardingStrategy == ShardingStrategyShuffle {
536+
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
518537
}
519538

520539
keys := append(seriesKeys, metadataKeys...)

pkg/querier/blocks_store_replicated_set_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
3737
block3Hash := cortex_tsdb.HashBlockID(block3)
3838
block4Hash := cortex_tsdb.HashBlockID(block4)
3939

40-
// Ensure the user ID we use belongs to the instances holding the token for the block 1
41-
// (it's expected by the assertions below).
4240
userID := "user-A"
43-
require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash)
4441

4542
tests := map[string]struct {
4643
shardingStrategy string
@@ -250,7 +247,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
250247
queryBlocks: []ulid.ULID{block1, block2, block4},
251248
expectedClients: map[string][]ulid.ULID{
252249
"127.0.0.1": {block1, block4},
253-
"127.0.0.2": {block2},
250+
"127.0.0.3": {block2},
254251
},
255252
},
256253
"shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": {
@@ -286,7 +283,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
286283
block2: {"127.0.0.1"},
287284
},
288285
expectedClients: map[string][]ulid.ULID{
289-
"127.0.0.2": {block1, block2},
286+
"127.0.0.3": {block1, block2},
290287
},
291288
},
292289
"shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": {
@@ -301,7 +298,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
301298
},
302299
queryBlocks: []ulid.ULID{block1, block2},
303300
exclude: map[ulid.ULID][]string{
304-
block1: {"127.0.0.1", "127.0.0.2"},
301+
block1: {"127.0.0.1", "127.0.0.3"},
305302
block2: {"127.0.0.1"},
306303
},
307304
expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),

pkg/ring/ring.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ type ReadRing interface {
4949
GetAll(op Operation) (ReplicationSet, error)
5050
ReplicationFactor() int
5151
IngesterCount() int
52-
Subring(key uint32, n int) ReadRing
52+
53+
// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
54+
// and size (number of instances).
55+
ShuffleShard(identifier string, size int) ReadRing
5356

5457
// HasInstance returns whether the ring contains an instance matching the provided instanceID.
5558
HasInstance(instanceID string) bool

pkg/storage/tsdb/util.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,3 @@ func HashBlockID(id ulid.ULID) uint32 {
1515
}
1616
return h
1717
}
18-
19-
// HashTenantID returns a 32-bit hash of the tenant ID useful for
20-
// ring-based sharding.
21-
func HashTenantID(id string) uint32 {
22-
return client.HashAdd32a(client.HashNew32a(), id)
23-
}

pkg/storegateway/sharding_strategy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLi
154154
return ring
155155
}
156156

157-
return ring.Subring(cortex_tsdb.HashTenantID(userID), shardSize)
157+
return ring.ShuffleShard(userID, shardSize)
158158
}
159159

160160
type shardingMetadataFilterAdapter struct {

pkg/storegateway/sharding_strategy_test.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
291291
block3Hash := cortex_tsdb.HashBlockID(block3)
292292
block4Hash := cortex_tsdb.HashBlockID(block4)
293293

294-
// Ensure the user ID we use belongs to the instances holding the token for the block 1
295-
// (it's expected by the assertions below).
296294
userID := "user-A"
297-
require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash)
298295

299296
type usersExpectation struct {
300297
instanceID string
@@ -499,62 +496,62 @@ func TestShuffleShardingStrategy(t *testing.T) {
499496
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
500497
setupRing: func(r *ring.Desc) {
501498
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE)
502-
r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE)
499+
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE)
503500

504-
r.Ingesters["instance-2"] = ring.IngesterDesc{
505-
Addr: "127.0.0.2",
501+
r.Ingesters["instance-3"] = ring.IngesterDesc{
502+
Addr: "127.0.0.3",
506503
Timestamp: time.Now().Add(-time.Hour).Unix(),
507504
State: ring.ACTIVE,
508-
Tokens: []uint32{block2Hash + 1},
505+
Tokens: []uint32{block3Hash + 1},
509506
}
510507
},
511508
expectedUsers: []usersExpectation{
512509
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}},
513-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}},
514-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil},
510+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil},
511+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}},
515512
},
516513
expectedBlocks: []blocksExpectation{
517514
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}},
518-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }},
519-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
515+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
516+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }},
520517
},
521518
},
522519
"LEAVING instance in the ring should continue to keep its shard blocks but they should also be replicated to another instance": {
523520
replicationFactor: 1,
524521
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
525522
setupRing: func(r *ring.Desc) {
526523
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE)
527-
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.LEAVING)
528-
r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE)
524+
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE)
525+
r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.LEAVING)
529526
},
530527
expectedUsers: []usersExpectation{
531528
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}},
532-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}},
533-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil},
529+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil},
530+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}},
534531
},
535532
expectedBlocks: []blocksExpectation{
536-
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}},
537-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}},
538-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
533+
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3 /* replicated: */, block4}},
534+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
535+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4}},
539536
},
540537
},
541538
"JOINING instance in the ring should get its shard blocks but they should also be replicated to another instance": {
542539
replicationFactor: 1,
543540
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
544541
setupRing: func(r *ring.Desc) {
545542
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE)
546-
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.JOINING)
547-
r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE)
543+
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE)
544+
r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.JOINING)
548545
},
549546
expectedUsers: []usersExpectation{
550547
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}},
551-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}},
552-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil},
548+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil},
549+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}},
553550
},
554551
expectedBlocks: []blocksExpectation{
555-
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}},
556-
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}},
557-
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
552+
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3 /* replicated: */, block4}},
553+
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
554+
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4}},
558555
},
559556
},
560557
"SS = 0 disables shuffle sharding": {

pkg/util/validation/limits.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Limits struct {
3838
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
3939
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name"`
4040
EnforceMetricName bool `yaml:"enforce_metric_name"`
41-
SubringSize int `yaml:"user_subring_size"`
41+
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size"`
4242

4343
// Ingester enforced limits.
4444
// Series
@@ -72,7 +72,7 @@ type Limits struct {
7272

7373
// RegisterFlags adds the flags required to config this to the given FlagSet
7474
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
75-
f.IntVar(&l.SubringSize, "experimental.distributor.user-subring-size", 0, "Per-user subring to shard metrics to ingesters. 0 is disabled.")
75+
f.IntVar(&l.IngestionTenantShardSize, "distribution.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
7676
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
7777
f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
7878
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
@@ -341,12 +341,12 @@ func (o *Overrides) MaxGlobalMetadataPerMetric(userID string) int {
341341
return o.getOverridesForUser(userID).MaxGlobalMetadataPerMetric
342342
}
343343

344-
// SubringSize returns the size of the subring for a given user.
345-
func (o *Overrides) SubringSize(userID string) int {
346-
return o.getOverridesForUser(userID).SubringSize
344+
// IngestionTenantShardSize returns the ingesters shard size for a given user.
345+
func (o *Overrides) IngestionTenantShardSize(userID string) int {
346+
return o.getOverridesForUser(userID).IngestionTenantShardSize
347347
}
348348

349-
// StoreGatewayTenantShardSize returns the size of the store-gateway shard size for a given user.
349+
// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
350350
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
351351
return o.getOverridesForUser(userID).StoreGatewayTenantShardSize
352352
}

0 commit comments

Comments
 (0)