Skip to content

limits: distributor user subrings #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
* `--store.min-chunk-age` has been removed
* `--querier.query-store-after` has been added in it's place.
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.enforce-metric-name
[enforce_metric_name: <boolean> | default = true]

# Per-user subring to shard metrics to ingesters. 0 is disabled.
# CLI flag: -experimental.distributor.user-subring-size
[user_subring_size: <int> | default = 0]

# The maximum number of series that a query can return.
# CLI flag: -ingester.max-series-per-query
[max_series_per_query: <int> | default = 100000]
Expand Down
14 changes: 13 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,19 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), numSamples)
}

err = ring.DoBatch(ctx, d.ingestersRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
var subRing ring.ReadRing
subRing = d.ingestersRing

// Obtain a subring if required
if size := d.limits.SubringSize(userID); size > 0 {
h := client.HashAdd32(client.HashNew32(), userID)
subRing, err = d.ingestersRing.Subring(h, size)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unable to create subring: %v", err)
}
}

err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
for _, i := range indexes {
timeseries = append(timeseries, validatedTimeseries[i])
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,10 @@ type mockRing struct {
replicationFactor uint32
}

func (r mockRing) Subring(key uint32, n int) (ring.ReadRing, error) {
return nil, fmt.Errorf("unimplemented")
}

func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Expand Down
66 changes: 66 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ReadRing interface {
GetAll() (ReplicationSet, error)
ReplicationFactor() int
IngesterCount() int
Subring(key uint32, n int) (ReadRing, error)
}

// Operation can be Read or Write
Expand Down Expand Up @@ -377,3 +378,68 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
r.name,
)
}

// Subring returns a ring of n ingesters from the given ring
// Subrings are meant only for ingestor lookup and should have their data externalized.
func (r *Ring) Subring(key uint32, n int) (ReadRing, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use some caching here? We're going to call this on every push request, which looks like a lot of ring traversal. On the other hand, number of different keys isn't that high (= number of tenants), so perhaps we can use some of distributors memory to cache these subrings for some time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that, but was concerned about cache invalidation when the ring size changes. So I decided to at least punt on that for now.

r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 || n <= 0 {
return nil, ErrEmptyRing
}

var (
ingesters = make(map[string]IngesterDesc, n)
distinctHosts = map[string]struct{}{}
start = r.search(key)
iterations = 0
)

// Subring exceeds number of ingesters, set to total ring size
if n > len(r.ringDesc.Ingesters) {
n = len(r.ringDesc.Ingesters)
}

for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)

// We want n *distinct* ingesters.
token := r.ringTokens[i]
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
distinctHosts[token.Ingester] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

ingesters[token.Ingester] = ingester
}

if n > len(ingesters) {
return nil, fmt.Errorf("too few ingesters found")
}

numTokens := 0
for _, ing := range ingesters {
numTokens += len(ing.Tokens)
}

sub := &Ring{
name: "subring",
cfg: r.cfg,
ringDesc: &Desc{
Ingesters: ingesters,
},
ringTokens: make([]TokenDesc, 0, numTokens),
}

// add tokens for the ingesters in the subring, they should already be sorted, so no need to re-sort
for _, t := range r.ringTokens {
if _, ok := ingesters[t.Ingester]; ok {
sub.ringTokens = append(sub.ringTokens, t)
}
}

return sub, nil
}
105 changes: 105 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -118,3 +119,107 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) {

require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens)
}

func TestSubring(t *testing.T) {
r := NewDesc()

n := 16 // number of ingesters in ring
var prevTokens []uint32
for i := 0; i < n; i++ {
name := fmt.Sprintf("ing%v", i)
ingTokens := GenerateTokens(128, prevTokens)

r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE)

prevTokens = append(prevTokens, ingTokens...)
}

// Create a ring with the ingesters
ring := Ring{
name: "main ring",
cfg: Config{
HeartbeatTimeout: time.Hour,
},
ringDesc: r,
ringTokens: r.getTokens(),
}

// Subring of 0 invalid
_, err := ring.Subring(0, 0)
require.Error(t, err)

// Generate a sub ring for all possible valid ranges
for i := 1; i < n+2; i++ {
subr, err := ring.Subring(rand.Uint32(), i)
require.NoError(t, err)
subringSize := i
if i > n {
subringSize = n
}
require.Equal(t, subringSize, len(subr.(*Ring).ringDesc.Ingesters))
require.Equal(t, subringSize*128, len(subr.(*Ring).ringTokens))
require.True(t, sort.SliceIsSorted(subr.(*Ring).ringTokens, func(i, j int) bool {
return subr.(*Ring).ringTokens[i].Token < subr.(*Ring).ringTokens[j].Token
}))

// Obtain a replication slice
size := i - 1
if size <= 0 {
size = 1
}
subr.(*Ring).cfg.ReplicationFactor = size
set, err := subr.Get(rand.Uint32(), Write, nil)
require.NoError(t, err)
require.Equal(t, size, len(set.Ingesters))
}
}

func TestStableSubring(t *testing.T) {
r := NewDesc()

n := 16 // number of ingesters in ring
var prevTokens []uint32
for i := 0; i < n; i++ {
name := fmt.Sprintf("ing%v", i)
ingTokens := GenerateTokens(128, prevTokens)

r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE)

prevTokens = append(prevTokens, ingTokens...)
}

// Create a ring with the ingesters
ring := Ring{
name: "main ring",
cfg: Config{
HeartbeatTimeout: time.Hour,
},
ringDesc: r,
ringTokens: r.getTokens(),
}

// Generate the same subring multiple times
var subrings [][]TokenDesc
key := rand.Uint32()
subringsize := 4
for i := 1; i < 4; i++ {
subr, err := ring.Subring(key, subringsize)
require.NoError(t, err)
require.Equal(t, subringsize, len(subr.(*Ring).ringDesc.Ingesters))
require.Equal(t, subringsize*128, len(subr.(*Ring).ringTokens))
require.True(t, sort.SliceIsSorted(subr.(*Ring).ringTokens, func(i, j int) bool {
return subr.(*Ring).ringTokens[i].Token < subr.(*Ring).ringTokens[j].Token
}))

subrings = append(subrings, subr.(*Ring).ringTokens)
}

// Validate that the same subring is produced each time from the same ring
for i := 0; i < len(subrings); i++ {
next := i + 1
if next >= len(subrings) {
next = 0
}
require.Equal(t, subrings[i], subrings[next])
}
}
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Limits struct {
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name"`
SubringSize int `yaml:"user_subring_size"`

// Ingester enforced limits.
MaxSeriesPerQuery int `yaml:"max_series_per_query"`
Expand All @@ -59,6 +60,7 @@ type Limits struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.SubringSize, "experimental.distributor.user-subring-size", 0, "Per-user subring to shard metrics to ingesters. 0 is disabled.")
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
Expand Down Expand Up @@ -280,6 +282,11 @@ func (o *Overrides) MinChunkLength(userID string) int {
return o.getOverridesForUser(userID).MinChunkLength
}

// SubringSize returns the size of the subring for a given user.
func (o *Overrides) SubringSize(userID string) int {
return o.getOverridesForUser(userID).SubringSize
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits(userID)
Expand Down