Skip to content

Implementing Spread Minimizing Token Generator #5855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837
* [ENHANCEMENT] Ring: Add experimental `-ingester.tokens-generator-strategy=minimize-spread` flag to enable the new minimize spread token generator strategy. #5855
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand Down
5 changes: 2 additions & 3 deletions pkg/alertmanager/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
)

func (r *MultitenantAlertmanager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (r *MultitenantAlertmanager) OnRingInstanceRegister(lc *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the alertmanager instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any).
Expand All @@ -13,8 +13,7 @@ func (r *MultitenantAlertmanager) OnRingInstanceRegister(_ *ring.BasicLifecycler
tokens = instanceDesc.GetTokens()
}

_, takenTokens := ringDesc.TokensFor(instanceID)
newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens)
newTokens := lc.GenerateTokens(&ringDesc, instanceID, instanceDesc.Zone, RingNumTokens-len(tokens), true)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Expand Down
6 changes: 4 additions & 2 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ receivers:
}

func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
tg := ring.NewRandomTokenGenerator()
tc := []struct {
name string
existing bool
Expand All @@ -1041,7 +1042,7 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
name: "with an instance already in the ring with ACTIVE state and all tokens",
existing: true,
initialState: ring.ACTIVE,
initialTokens: ring.GenerateTokens(128, nil),
initialTokens: tg.GenerateTokens(ring.NewDesc(), "id1", "", 128, true),
},
{
name: "with an instance already in the ring with LEAVING state and all tokens",
Expand Down Expand Up @@ -1520,9 +1521,10 @@ func TestMultitenantAlertmanager_RingLifecyclerShouldAutoForgetUnhealthyInstance
require.NoError(t, services.StartAndAwaitRunning(ctx, am))
defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck

tg := ring.NewRandomTokenGenerator()
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
instance := ringDesc.AddIngester(unhealthyInstanceID, "127.0.0.1", "", ring.GenerateTokens(RingNumTokens, nil), ring.ACTIVE, time.Now())
instance := ringDesc.AddIngester(unhealthyInstanceID, "127.0.0.1", "", tg.GenerateTokens(ringDesc, unhealthyInstanceID, "", RingNumTokens, true), ring.ACTIVE, time.Now())
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
ringDesc.Ingesters[unhealthyInstanceID] = instance

Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func (c *Config) Validate(log log.Logger) error {
return errors.Wrap(err, "invalid alertmanager config")
}

if err := c.Ingester.Validate(); err != nil {
return errors.Wrap(err, "invalid ingester config")
}

if err := c.Tracing.Validate(); err != nil {
return errors.Wrap(err, "invalid tracing config")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,8 @@ func BenchmarkDistributor_Push(b *testing.B) {
},
}

tg := ring.NewRandomTokenGenerator()

for testName, testData := range tests {
b.Run(testName, func(b *testing.B) {

Expand All @@ -1942,7 +1944,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
err := kvStore.CAS(context.Background(), ingester.RingKey,
func(_ interface{}) (interface{}, bool, error) {
d := &ring.Desc{}
d.AddIngester("ingester-1", "127.0.0.1", "", ring.GenerateTokens(128, nil), ring.ACTIVE, time.Now())
d.AddIngester("ingester-1", "127.0.0.1", "", tg.GenerateTokens(d, "ingester-1", "", 128, true), ring.ACTIVE, time.Now())
return d, true, nil
},
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

}

func (cfg *Config) Validate() error {
if err := cfg.LifecyclerConfig.Validate(); err != nil {
return err
}

return nil
}

func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} {
if cfg.IgnoreSeriesLimitForMetricNames == "" {
return nil
Expand Down
35 changes: 22 additions & 13 deletions pkg/ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
mathrand "math/rand"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -48,9 +49,10 @@ type BasicLifecyclerConfig struct {
// if zone awareness is unused.
Zone string

HeartbeatPeriod time.Duration
TokensObservePeriod time.Duration
NumTokens int
HeartbeatPeriod time.Duration
TokensObservePeriod time.Duration
NumTokens int
TokensGeneratorStrategy string

// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
Expand All @@ -67,6 +69,7 @@ type BasicLifecyclerConfig struct {
// responsibility to ChangeState().
type BasicLifecycler struct {
*services.BasicService
TokenGenerator

cfg BasicLifecyclerConfig
logger log.Logger
Expand All @@ -88,15 +91,21 @@ type BasicLifecycler struct {

// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tg := NewRandomTokenGenerator()
if strings.EqualFold(cfg.TokensGeneratorStrategy, minimizeSpreadTokenStrategy) {
tg = NewMinimizeSpreadTokenGenerator()
}

l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
ringKey: ringKey,
logger: logger,
store: store,
delegate: delegate,
metrics: NewBasicLifecyclerMetrics(ringName, reg),
actorChan: make(chan func()),
cfg: cfg,
ringName: ringName,
ringKey: ringKey,
logger: logger,
store: store,
delegate: delegate,
metrics: NewBasicLifecyclerMetrics(ringName, reg),
actorChan: make(chan func()),
TokenGenerator: tg,
}

l.metrics.tokensToOwn.Set(float64(cfg.NumTokens))
Expand Down Expand Up @@ -350,7 +359,7 @@ func (l *BasicLifecycler) verifyTokens(ctx context.Context) bool {

err := l.updateInstance(ctx, func(r *Desc, i *InstanceDesc) bool {
// At this point, we should have the same tokens as we have registered before.
actualTokens, takenTokens := r.TokensFor(l.cfg.ID)
actualTokens, _ := r.TokensFor(l.cfg.ID)

if actualTokens.Equals(l.GetTokens()) {
// Tokens have been verified. No need to change them.
Expand All @@ -362,7 +371,7 @@ func (l *BasicLifecycler) verifyTokens(ctx context.Context) bool {
needTokens := l.cfg.NumTokens - len(actualTokens)

level.Info(l.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", l.ringName)
newTokens := GenerateTokens(needTokens, takenTokens)
newTokens := l.GenerateTokens(r, l.cfg.ID, l.cfg.Zone, needTokens, true)

actualTokens = append(actualTokens, newTokens...)
sort.Sort(actualTokens)
Expand Down
37 changes: 33 additions & 4 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
mathrand "math/rand"
"os"
"slices"
"sort"
"strings"
"sync"
"time"

Expand All @@ -21,12 +23,17 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)

var (
errInvalidTokensGeneratorStrategy = errors.New("invalid token generator strategy")
)

// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`

// Config for the ingester lifecycle control
NumTokens int `yaml:"num_tokens"`
TokensGeneratorStrategy string `yaml:"tokens_generator_strategy" doc:"hidden"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
ObservePeriod time.Duration `yaml:"observe_period"`
JoinAfter time.Duration `yaml:"join_after"`
Expand Down Expand Up @@ -63,6 +70,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
}

f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.")
f.StringVar(&cfg.TokensGeneratorStrategy, prefix+"tokens-generator-strategy", randomTokenStrategy, fmt.Sprintf("Algorithm used to generate new tokens. Supported Values: %s", strings.Join(supportedTokenStrategy, ",")))
f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul. 0 = disabled.")
f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
Expand All @@ -85,6 +93,14 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.BoolVar(&cfg.ReadinessCheckRingHealth, prefix+"readiness-check-ring-health", true, "When enabled the readiness probe succeeds only after all instances are ACTIVE and healthy in the ring, otherwise only the instance itself is checked. This option should be disabled if in your cluster multiple instances can be rolled out simultaneously, otherwise rolling updates may be slowed down.")
}

func (cfg *LifecyclerConfig) Validate() error {
if cfg.TokensGeneratorStrategy != "" && !slices.Contains(supportedTokenStrategy, strings.ToLower(cfg.TokensGeneratorStrategy)) {
return errInvalidTokensGeneratorStrategy
}

return nil
}

// Lifecycler is responsible for managing the lifecycle of entries in the ring.
type Lifecycler struct {
*services.BasicService
Expand Down Expand Up @@ -129,6 +145,8 @@ type Lifecycler struct {

lifecyclerMetrics *LifecyclerMetrics
logger log.Logger

tg TokenGenerator
}

// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
Expand Down Expand Up @@ -168,6 +186,12 @@ func NewLifecycler(
flushTransferer = NewNoopFlushTransferer()
}

tg := NewRandomTokenGenerator()

if strings.EqualFold(cfg.TokensGeneratorStrategy, minimizeSpreadTokenStrategy) {
tg = NewMinimizeSpreadTokenGenerator()
}

l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
Expand All @@ -185,6 +209,7 @@ func NewLifecycler(
state: PENDING,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
tg: tg,
}

l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens))
Expand Down Expand Up @@ -689,14 +714,14 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
}

// At this point, we should have the same tokens as we have registered before
ringTokens, takenTokens := ringDesc.TokensFor(i.ID)
ringTokens, _ := ringDesc.TokensFor(i.ID)

if !i.compareTokens(ringTokens) {
// uh, oh... our tokens are not our anymore. Let's try new ones.
needTokens := i.cfg.NumTokens - len(ringTokens)

level.Info(i.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", i.RingName)
newTokens := GenerateTokens(needTokens, takenTokens)
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, true)

ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)
Expand Down Expand Up @@ -754,7 +779,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er

// At this point, we should not have any tokens, and we should be in PENDING state.
// Need to make sure we didn't change the num of tokens configured
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
myTokens, _ := ringDesc.TokensFor(i.ID)
needTokens := i.cfg.NumTokens - len(myTokens)

if needTokens == 0 && myTokens.Equals(i.getTokens()) {
Expand All @@ -763,7 +788,11 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
return ringDesc, true, nil
}

newTokens := GenerateTokens(needTokens, takenTokens)
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, false)
if len(newTokens) != needTokens {
level.Warn(i.logger).Log("msg", "retrying generate tokens")
return ringDesc, true, errors.New("could not generate tokens")
}

myTokens = append(myTokens, newTokens...)
sort.Sort(myTokens)
Expand Down
Loading