Skip to content

Commit 9941124

Browse files
authored
[ingesters] Send heartbeat during wall replay (#4847)
* WIP Signed-off-by: Alan Protasio <approtas@amazon.com> * changelog Signed-off-by: Alan Protasio <approtas@amazon.com> * joing should not block Signed-off-by: Alan Protasio <approtas@amazon.com> * commments Signed-off-by: Alan Protasio <approtas@amazon.com> Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent b855a25 commit 9941124

File tree

7 files changed

+158
-38
lines changed

7 files changed

+158
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 #4839
4444
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
4545
* [ENHANCEMENT] Cortex now built with Go 1.18. #4829
46+
* [ENHANCEMENT] Ingester: Prevent ingesters to become unhealthy during wall replay. #4847
4647
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
4748
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
4849
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818

pkg/compactor/compactor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (c *Compactor) starting(ctx context.Context) error {
476476
// Initialize the compactors ring if sharding is enabled.
477477
if c.compactorCfg.ShardingEnabled {
478478
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
479-
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
479+
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
480480
if err != nil {
481481
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
482482
}

pkg/distributor/distributor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
216216
if !canJoinDistributorsRing {
217217
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
218218
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
219-
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
219+
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
220220
if err != nil {
221221
return nil, err
222222
}

pkg/ingester/ingester.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
638638
}, i.getOldestUnshippedBlockMetric)
639639
}
640640

641-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
641+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, false, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
642642
if err != nil {
643643
return nil, err
644644
}
@@ -706,13 +706,6 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {
706706
}
707707

708708
func (i *Ingester) starting(ctx context.Context) error {
709-
if err := i.openExistingTSDB(ctx); err != nil {
710-
// Try to rollback and close opened TSDBs before halting the ingester.
711-
i.closeAllTSDB()
712-
713-
return errors.Wrap(err, "opening existing TSDBs")
714-
}
715-
716709
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
717710
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
718711
return errors.Wrap(err, "failed to start lifecycler")
@@ -721,6 +714,15 @@ func (i *Ingester) starting(ctx context.Context) error {
721714
return errors.Wrap(err, "failed to start lifecycler")
722715
}
723716

717+
if err := i.openExistingTSDB(ctx); err != nil {
718+
// Try to rollback and close opened TSDBs before halting the ingester.
719+
i.closeAllTSDB()
720+
721+
return errors.Wrap(err, "opening existing TSDBs")
722+
}
723+
724+
i.lifecycler.Join()
725+
724726
// let's start the rest of subservices via manager
725727
servs := []services.Service(nil)
726728

pkg/ring/lifecycler.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ type Lifecycler struct {
9393
flushTransferer FlushTransferer
9494
KVStore kv.Client
9595

96-
actorChan chan func()
96+
actorChan chan func()
97+
autojoinChan chan struct{}
9798

9899
// These values are initialised at startup, and never change
99100
ID string
@@ -106,6 +107,9 @@ type Lifecycler struct {
106107
flushOnShutdown *atomic.Bool
107108
unregisterOnShutdown *atomic.Bool
108109

110+
// Whether to auto join on ring on startup. If set to false, Join should be called.
111+
autoJoinOnStartup bool
112+
109113
// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
110114
// goes away and comes back empty. The state changes during lifecycle of instance.
111115
stateMtx sync.RWMutex
@@ -128,7 +132,14 @@ type Lifecycler struct {
128132
}
129133

130134
// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
131-
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) {
135+
func NewLifecycler(
136+
cfg LifecyclerConfig,
137+
flushTransferer FlushTransferer,
138+
ringName, ringKey string,
139+
autoJoinOnStartup, flushOnShutdown bool,
140+
logger log.Logger,
141+
reg prometheus.Registerer,
142+
) (*Lifecycler, error) {
132143
addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger)
133144
if err != nil {
134145
return nil, err
@@ -165,10 +176,12 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
165176
ID: cfg.ID,
166177
RingName: ringName,
167178
RingKey: ringKey,
179+
autoJoinOnStartup: autoJoinOnStartup,
168180
flushOnShutdown: atomic.NewBool(flushOnShutdown),
169181
unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
170182
Zone: zone,
171183
actorChan: make(chan func()),
184+
autojoinChan: make(chan struct{}, 1),
172185
state: PENDING,
173186
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
174187
logger: logger,
@@ -391,23 +404,43 @@ func (i *Lifecycler) ZonesCount() int {
391404
return i.zonesCount
392405
}
393406

407+
// Join trigger the instance to join the ring, if autoJoinOnStartup is set to false.
408+
func (i *Lifecycler) Join() {
409+
select {
410+
case i.autojoinChan <- struct{}{}:
411+
default:
412+
level.Warn(i.logger).Log("msg", "join was called more than one time", "ring", i.RingName)
413+
}
414+
}
415+
394416
func (i *Lifecycler) loop(ctx context.Context) error {
417+
joined := false
395418
// First, see if we exist in the cluster, update our state to match if we do,
396419
// and add ourselves (without tokens) if we don't.
397420
if err := i.initRing(context.Background()); err != nil {
398421
return perrors.Wrapf(err, "failed to join the ring %s", i.RingName)
399422
}
400423

401424
// We do various period tasks
402-
autoJoinAfter := time.After(i.cfg.JoinAfter)
425+
var autoJoinAfter <-chan time.Time
403426
var observeChan <-chan time.Time
404427

428+
if i.autoJoinOnStartup {
429+
autoJoinAfter = time.After(i.cfg.JoinAfter)
430+
}
431+
405432
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
406433
defer heartbeatTickerStop()
407434

408435
for {
409436
select {
437+
case <-i.autojoinChan:
438+
autoJoinAfter = time.After(i.cfg.JoinAfter)
410439
case <-autoJoinAfter:
440+
if joined {
441+
continue
442+
}
443+
joined = true
411444
level.Debug(i.logger).Log("msg", "JoinAfter expired", "ring", i.RingName)
412445
// Will only fire once, after auto join timeout. If we haven't entered "JOINING" state,
413446
// then pick some tokens and enter ACTIVE state.
@@ -556,7 +589,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
556589
// We use the tokens from the file only if it does not exist in the ring yet.
557590
if len(tokensFromFile) > 0 {
558591
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
559-
if len(tokensFromFile) >= i.cfg.NumTokens {
592+
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
560593
i.setState(ACTIVE)
561594
}
562595
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
@@ -587,9 +620,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
587620

588621
// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
589622
// OR unregister_on_shutdown=false
590-
// Move it into ACTIVE to ensure the ingester joins the ring.
623+
// if autoJoinOnStartup, move it into ACTIVE to ensure the ingester joins the ring.
624+
// else set to PENDING
591625
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
592-
instanceDesc.State = ACTIVE
626+
if i.autoJoinOnStartup {
627+
instanceDesc.State = ACTIVE
628+
} else {
629+
instanceDesc.State = PENDING
630+
}
593631
}
594632

595633
// We exist in the ring, so assume the ring is right and copy out tokens & state out of there.
@@ -699,9 +737,6 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
699737

700738
// At this point, we should not have any tokens, and we should be in PENDING state.
701739
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
702-
if len(myTokens) > 0 {
703-
level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName)
704-
}
705740

706741
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
707742
i.setState(targetState)

0 commit comments

Comments
 (0)