Skip to content

Commit 7440436

Browse files
committed
WIP
Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent b855a25 commit 7440436

File tree

6 files changed

+124
-38
lines changed

6 files changed

+124
-38
lines changed

pkg/compactor/compactor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (c *Compactor) starting(ctx context.Context) error {
476476
// Initialize the compactors ring if sharding is enabled.
477477
if c.compactorCfg.ShardingEnabled {
478478
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
479-
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
479+
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
480480
if err != nil {
481481
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
482482
}

pkg/distributor/distributor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
216216
if !canJoinDistributorsRing {
217217
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
218218
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
219-
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
219+
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
220220
if err != nil {
221221
return nil, err
222222
}

pkg/ingester/ingester.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
638638
}, i.getOldestUnshippedBlockMetric)
639639
}
640640

641-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
641+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, false, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
642642
if err != nil {
643643
return nil, err
644644
}
@@ -706,13 +706,6 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {
706706
}
707707

708708
func (i *Ingester) starting(ctx context.Context) error {
709-
if err := i.openExistingTSDB(ctx); err != nil {
710-
// Try to rollback and close opened TSDBs before halting the ingester.
711-
i.closeAllTSDB()
712-
713-
return errors.Wrap(err, "opening existing TSDBs")
714-
}
715-
716709
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
717710
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
718711
return errors.Wrap(err, "failed to start lifecycler")
@@ -721,6 +714,15 @@ func (i *Ingester) starting(ctx context.Context) error {
721714
return errors.Wrap(err, "failed to start lifecycler")
722715
}
723716

717+
if err := i.openExistingTSDB(ctx); err != nil {
718+
// Try to rollback and close opened TSDBs before halting the ingester.
719+
i.closeAllTSDB()
720+
721+
return errors.Wrap(err, "opening existing TSDBs")
722+
}
723+
724+
i.lifecycler.Join()
725+
724726
// let's start the rest of subservices via manager
725727
servs := []services.Service(nil)
726728

pkg/ring/lifecycler.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ type Lifecycler struct {
9393
flushTransferer FlushTransferer
9494
KVStore kv.Client
9595

96-
actorChan chan func()
96+
actorChan chan func()
97+
autojoinChan chan struct{}
9798

9899
// These values are initialised at startup, and never change
99100
ID string
@@ -106,6 +107,9 @@ type Lifecycler struct {
106107
flushOnShutdown *atomic.Bool
107108
unregisterOnShutdown *atomic.Bool
108109

110+
// Whether to auto join on ring on startup.
111+
autoJoinOnStartup bool
112+
109113
// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
110114
// goes away and comes back empty. The state changes during lifecycle of instance.
111115
stateMtx sync.RWMutex
@@ -128,7 +132,14 @@ type Lifecycler struct {
128132
}
129133

130134
// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
131-
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) {
135+
func NewLifecycler(
136+
cfg LifecyclerConfig,
137+
flushTransferer FlushTransferer,
138+
ringName, ringKey string,
139+
autoJoinOnStartup, flushOnShutdown bool,
140+
logger log.Logger,
141+
reg prometheus.Registerer,
142+
) (*Lifecycler, error) {
132143
addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger)
133144
if err != nil {
134145
return nil, err
@@ -165,10 +176,12 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
165176
ID: cfg.ID,
166177
RingName: ringName,
167178
RingKey: ringKey,
179+
autoJoinOnStartup: autoJoinOnStartup,
168180
flushOnShutdown: atomic.NewBool(flushOnShutdown),
169181
unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
170182
Zone: zone,
171183
actorChan: make(chan func()),
184+
autojoinChan: make(chan struct{}),
172185
state: PENDING,
173186
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
174187
logger: logger,
@@ -391,23 +404,38 @@ func (i *Lifecycler) ZonesCount() int {
391404
return i.zonesCount
392405
}
393406

407+
func (i *Lifecycler) Join() {
408+
i.autojoinChan <- struct{}{}
409+
}
410+
394411
func (i *Lifecycler) loop(ctx context.Context) error {
412+
joined := false
395413
// First, see if we exist in the cluster, update our state to match if we do,
396414
// and add ourselves (without tokens) if we don't.
397415
if err := i.initRing(context.Background()); err != nil {
398416
return perrors.Wrapf(err, "failed to join the ring %s", i.RingName)
399417
}
400418

401419
// We do various period tasks
402-
autoJoinAfter := time.After(i.cfg.JoinAfter)
420+
var autoJoinAfter <-chan time.Time
403421
var observeChan <-chan time.Time
404422

423+
if i.autoJoinOnStartup {
424+
i.Join()
425+
}
426+
405427
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
406428
defer heartbeatTickerStop()
407429

408430
for {
409431
select {
432+
case <-i.autojoinChan:
433+
autoJoinAfter = time.After(i.cfg.JoinAfter)
410434
case <-autoJoinAfter:
435+
if joined {
436+
continue
437+
}
438+
joined = true
411439
level.Debug(i.logger).Log("msg", "JoinAfter expired", "ring", i.RingName)
412440
// Will only fire once, after auto join timeout. If we haven't entered "JOINING" state,
413441
// then pick some tokens and enter ACTIVE state.
@@ -556,7 +584,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
556584
// We use the tokens from the file only if it does not exist in the ring yet.
557585
if len(tokensFromFile) > 0 {
558586
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
559-
if len(tokensFromFile) >= i.cfg.NumTokens {
587+
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
560588
i.setState(ACTIVE)
561589
}
562590
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
@@ -587,9 +615,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
587615

588616
// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
589617
// OR unregister_on_shutdown=false
590-
// Move it into ACTIVE to ensure the ingester joins the ring.
618+
// if autoJoinOnStartup, move it into ACTIVE to ensure the ingester joins the ring.
619+
// else set to PENDING
591620
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
592-
instanceDesc.State = ACTIVE
621+
if i.autoJoinOnStartup {
622+
instanceDesc.State = ACTIVE
623+
} else {
624+
instanceDesc.State = PENDING
625+
}
593626
}
594627

595628
// We exist in the ring, so assume the ring is right and copy out tokens & state out of there.
@@ -699,9 +732,6 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
699732

700733
// At this point, we should not have any tokens, and we should be in PENDING state.
701734
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
702-
if len(myTokens) > 0 {
703-
level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName)
704-
}
705735

706736
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
707737
i.setState(targetState)

0 commit comments

Comments
 (0)