Skip to content

Added shuffle sharding support to generate a subring #3090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

## master / unreleased

* [CHANGE] Improved shuffle sharding support in the write path. This work introduced some config changes: #3090
* Introduced `-distributor.sharding-strategy` CLI flag (and its respective `sharding_strategy` YAML config option) to explicitly specify which sharding strategy should be used in the write path
* `-experimental.distributor.user-subring-size` flag renamed to `-distributor.ingestion-tenant-shard-size`
* `user_subring_size` limit YAML config option renamed to `ingestion_tenant_shard_size`
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113
* [ENHANCEMENT] Shuffle sharding: improved shuffle sharding in the write path. Shuffle sharding now should be explicitly enabled via `-distributor.sharding-strategy` CLI flag (or its respective YAML config option) and guarantees stability, consistency, shuffling and balanced zone-awareness properties. #3090
* [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
Expand Down
13 changes: 10 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ ha_tracker:
# CLI flag: -distributor.extra-query-delay
[extra_queue_delay: <duration> | default = 0s]

# The sharding strategy to use. Supported values are: default, shuffle-sharding.
# CLI flag: -distributor.sharding-strategy
[sharding_strategy: <string> | default = "default"]

# Distribute samples based on all labels, as opposed to solely by user and
# metric name.
# CLI flag: -distributor.shard-by-all-labels
Expand Down Expand Up @@ -2748,9 +2752,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.enforce-metric-name
[enforce_metric_name: <boolean> | default = true]

# Per-user subring to shard metrics to ingesters. 0 is disabled.
# CLI flag: -experimental.distributor.user-subring-size
[user_subring_size: <int> | default = 0]
# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set both on ingesters and distributors. When this setting is specified
# in the per-tenant overrides, a value of 0 disables shuffle sharding for the
# tenant.
# CLI flag: -distributor.ingestion-tenant-shard-size
[ingestion_tenant_shard_size: <int> | default = 0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it based on the similar "store_gateway_tenant_shard_size" flag from https://github.com/cortexproject/cortex/pull/3069/files you are planning to name all the similar flags "_tenant_shard_size"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are you planning to take/keep the -experimental off this when you are ready to submit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it based on the similar "store_gateway_tenant_shard_size" flag from https://github.com/cortexproject/cortex/pull/3069/files you are planning to name all the similar flags "_tenant_shard_size"?

Yes, I do. The reason is that overrides are specified within the same YAML node, so we need a way to clearly differentiate them. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are you planning to take/keep the -experimental off this when you are ready to submit?

I would plan to remove the experimental flag once ready to submit, unless you have any concerns. Generally speaking, I think originally adding the experimental prefix to CLI flags was a mistake and shouldn't be done anymore. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean document as experimental but not name the flag experimental? That sounds fine to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes Richard, you're correct (sorry for being unclear). This way, once we're all comfortable marking it stable, it will just be a doc change instead of a config change.


# The maximum number of series for which a query can fetch samples from each
# ingester. This limit is enforced only in the ingesters (when querying samples
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Currently experimental features are:

- Azure blob storage.
- Zone awareness based replication.
- User subrings.
- Shuffle sharding (both read and write path).
- Ruler API (to PUT rules).
- Alertmanager API
- Memcached client DNS-based service discovery.
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func filterMetrics(metrics []*io_prometheus_client.Metric, opts MetricsOptions)
return filtered
}

func sumValues(values []float64) float64 {
func SumValues(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([
return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
}

sums[i] = sumValues(getValues(metrics, options))
sums[i] = SumValues(getValues(metrics, options))
}

return sums, nil
Expand Down
110 changes: 110 additions & 0 deletions integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// +build requires_docker

package integration

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestIngesterSharding(t *testing.T) {
const numSeriesToPush = 1000

tests := map[string]struct {
shardingStrategy string
tenantShardSize int
expectedIngestersWithSeries int
}{
"default sharding strategy should spread series across all ingesters": {
shardingStrategy: "default",
tenantShardSize: 2, // Ignored by default strategy.
expectedIngestersWithSeries: 3,
},
"shuffle-sharding strategy should spread series across the configured shard size number of ingesters": {
shardingStrategy: "shuffle-sharding",
tenantShardSize: 2,
expectedIngestersWithSeries: 2,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Push series.
now := time.Now()

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

for i := 1; i <= numSeriesToPush; i++ {
series, _ := generateSeries(fmt.Sprintf("series_%d", i), now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// Extract metrics from ingesters.
numIngestersWithSeries := 0
totalIngestedSeries := 0

for _, ing := range []*e2ecortex.CortexService{ingester1, ingester2, ingester3} {
values, err := ing.SumMetrics([]string{"cortex_ingester_memory_series"})
require.NoError(t, err)

numMemorySeries := e2e.SumValues(values)
totalIngestedSeries += int(numMemorySeries)
if numMemorySeries > 0 {
numIngestersWithSeries++
}
}

require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
require.Equal(t, numSeriesToPush, totalIngestedSeries)

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester1)
assertServiceMetricsPrefixes(t, Ingester, ingester2)
assertServiceMetricsPrefixes(t, Ingester, ingester3)
assertServiceMetricsPrefixes(t, Querier, querier)
})
}
}
2 changes: 1 addition & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.Distributor.Validate(); err != nil {
if err := c.Distributor.Validate(c.LimitsConfig); err != nil {
return errors.Wrap(err, "invalid distributor config")
}
if err := c.Querier.Validate(); err != nil {
Expand Down
35 changes: 27 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package distributor
import (
"context"
"flag"
"fmt"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -104,11 +105,21 @@ var (
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"})
emptyPreallocSeries = ingester_client.PreallocTimeseries{}

supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle}

// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
)

const (
typeSamples = "samples"
typeMetadata = "metadata"

// Supported sharding strategies.
ShardingStrategyDefault = "default"
ShardingStrategyShuffle = "shuffle-sharding"
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -147,7 +158,8 @@ type Config struct {
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand All @@ -170,10 +182,19 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
}

// Validate config and returns error on failure
func (cfg *Config) Validate() error {
func (cfg *Config) Validate(limits validation.Limits) error {
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
return errInvalidShardingStrategy
}

if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.IngestionTenantShardSize <= 0 {
return errInvalidTenantShardSize
}

return cfg.HATrackerConfig.Validate()
}

Expand Down Expand Up @@ -508,13 +529,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

var subRing ring.ReadRing
subRing = d.ingestersRing
subRing := d.ingestersRing.(ring.ReadRing)

// Obtain a subring if required
if size := d.limits.SubringSize(userID); size > 0 {
h := client.HashAdd32a(client.HashNew32a(), userID)
subRing = d.ingestersRing.Subring(h, size)
// Obtain a subring if required.
if d.cfg.ShardingStrategy == ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
}

keys := append(seriesKeys, metadataKeys...)
Expand Down
52 changes: 52 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,58 @@ var (
ctx = user.InjectOrgID(context.Background(), "user")
)

func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
initConfig func(*Config)
initLimits func(*validation.Limits)
expected error
}{
"default config should pass": {
initConfig: func(_ *Config) {},
initLimits: func(_ *validation.Limits) {},
expected: nil,
},
"should fail on invalid sharding strategy": {
initConfig: func(cfg *Config) {
cfg.ShardingStrategy = "xxx"
},
initLimits: func(_ *validation.Limits) {},
expected: errInvalidShardingStrategy,
},
"should fail if the default shard size is 0 on when sharding strategy = shuffle-sharding": {
initConfig: func(cfg *Config) {
cfg.ShardingStrategy = "shuffle-sharding"
},
initLimits: func(limits *validation.Limits) {
limits.IngestionTenantShardSize = 0
},
expected: errInvalidTenantShardSize,
},
"should pass if the default shard size > 0 on when sharding strategy = shuffle-sharding": {
initConfig: func(cfg *Config) {
cfg.ShardingStrategy = "shuffle-sharding"
},
initLimits: func(limits *validation.Limits) {
limits.IngestionTenantShardSize = 3
},
expected: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := Config{}
limits := validation.Limits{}
flagext.DefaultValues(&cfg, &limits)

testData.initConfig(&cfg)
testData.initLimits(&limits)

assert.Equal(t, testData.expected, cfg.Validate(limits))
})
}
}

func TestDistributor_Push(t *testing.T) {
// Metrics to assert on.
lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"
Expand Down
9 changes: 3 additions & 6 deletions pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
block3Hash := cortex_tsdb.HashBlockID(block3)
block4Hash := cortex_tsdb.HashBlockID(block4)

// Ensure the user ID we use belongs to the instances holding the token for the block 1
// (it's expected by the assertions below).
userID := "user-A"
require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash)

tests := map[string]struct {
shardingStrategy string
Expand Down Expand Up @@ -250,7 +247,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
queryBlocks: []ulid.ULID{block1, block2, block4},
expectedClients: map[string][]ulid.ULID{
"127.0.0.1": {block1, block4},
"127.0.0.2": {block2},
"127.0.0.3": {block2},
},
},
"shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": {
Expand Down Expand Up @@ -286,7 +283,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
block2: {"127.0.0.1"},
},
expectedClients: map[string][]ulid.ULID{
"127.0.0.2": {block1, block2},
"127.0.0.3": {block1, block2},
},
},
"shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": {
Expand All @@ -301,7 +298,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
},
queryBlocks: []ulid.ULID{block1, block2},
exclude: map[ulid.ULID][]string{
block1: {"127.0.0.1", "127.0.0.2"},
block1: {"127.0.0.1", "127.0.0.3"},
block2: {"127.0.0.1"},
},
expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),
Expand Down
19 changes: 19 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,25 @@ func (d *Desc) getTokens() []TokenDesc {
return tokens
}

// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone
// are guaranteed to be sorted.
func (d *Desc) getTokensByZone() map[string][]TokenDesc {
zones := map[string][]TokenDesc{}

for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
}
}

// Ensure tokens are sorted within each zone.
for zone := range zones {
sort.Sort(ByToken(zones[zone]))
}

return zones
}

func GetOrCreateRingDesc(d interface{}) *Desc {
if d == nil {
return NewDesc()
Expand Down
Loading