Skip to content

Commit bdc357c

Browse files
authored
Make sure unregistered ingester joining the ring after WAL replay (#6277)
* Make sure ingester is active when joining the ring Signed-off-by: Alex Le <leqiyue@amazon.com> * tokens from file would be ignored if instance was not in the ring when starting Signed-off-by: Alex Le <leqiyue@amazon.com> * Skip CAS if instance was not in the ring on start and delay heartbeat start time Signed-off-by: Alex Le <leqiyue@amazon.com> * Fix compactor tests Signed-off-by: Alex Le <leqiyue@amazon.com> * Fixed existing token not loaded issue and added unit test Signed-off-by: Alex Le <leqiyue@amazon.com> * update changelog Signed-off-by: Alex Le <leqiyue@amazon.com> --------- Signed-off-by: Alex Le <leqiyue@amazon.com>
1 parent 7191ecb commit bdc357c

File tree

5 files changed

+155
-22
lines changed

5 files changed

+155
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372
4646
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
4747
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
48+
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay #6277
4849
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
4950
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
5051
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271

pkg/compactor/compactor_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10431043
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)
10441044

10451045
assert.ElementsMatch(t, []string{
1046+
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=ACTIVE`,
10461047
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
10471048
`level=info component=compactor msg="compactor is ACTIVE in the ring"`,
10481049
`level=info component=compactor msg="compactor started"`,
@@ -1836,6 +1837,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
18361837
assert.Equal(t, context.DeadlineExceeded, err)
18371838

18381839
assert.ElementsMatch(t, []string{
1840+
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=JOINING`,
18391841
`level=info component=compactor msg="compactor started"`,
18401842
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
18411843
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,

pkg/ring/kv/dynamodb/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
199199

200200
if len(putRequests) == 0 && len(deleteRequests) == 0 {
201201
// no change detected, retry
202+
level.Warn(c.logger).Log("msg", "no change detected in ring, retry CAS")
202203
bo.Wait()
203204
continue
204205
}

pkg/ring/lifecycler.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
491491
joined := false
492492
// First, see if we exist in the cluster, update our state to match if we do,
493493
// and add ourselves (without tokens) if we don't.
494-
if err := i.initRing(context.Background()); err != nil {
494+
addedInRing, err := i.initRing(context.Background())
495+
if err != nil {
495496
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
496497
}
497498

@@ -504,18 +505,23 @@ func (i *Lifecycler) loop(ctx context.Context) error {
504505
}
505506

506507
var heartbeatTickerChan <-chan time.Time
507-
if uint64(i.cfg.HeartbeatPeriod) > 0 {
508-
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
509-
heartbeatTicker.Stop()
510-
// We are jittering for at least half of the time and max the time of the heartbeat.
511-
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
512-
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
513-
i.heartbeat(ctx)
514-
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
515-
})
516-
defer heartbeatTicker.Stop()
517-
518-
heartbeatTickerChan = heartbeatTicker.C
508+
startHeartbeat := func() {
509+
if uint64(i.cfg.HeartbeatPeriod) > 0 {
510+
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
511+
heartbeatTicker.Stop()
512+
// We are jittering for at least half of the time and max the time of the heartbeat.
513+
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
514+
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
515+
i.heartbeat(ctx)
516+
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
517+
})
518+
defer heartbeatTicker.Stop()
519+
520+
heartbeatTickerChan = heartbeatTicker.C
521+
}
522+
}
523+
if addedInRing {
524+
startHeartbeat()
519525
}
520526

521527
for {
@@ -536,17 +542,21 @@ func (i *Lifecycler) loop(ctx context.Context) error {
536542
if i.cfg.ObservePeriod > 0 {
537543
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
538544
// ingesters, but we also signal that it is not fully functional yet.
539-
if err := i.autoJoin(context.Background(), JOINING); err != nil {
545+
if err := i.autoJoin(context.Background(), JOINING, addedInRing); err != nil {
540546
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
541547
}
542548

543549
level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
544550
observeChan = time.After(i.cfg.ObservePeriod)
545551
} else {
546-
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
552+
if err := i.autoJoin(context.Background(), i.getPreviousState(), addedInRing); err != nil {
547553
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
548554
}
549555
}
556+
557+
if !addedInRing {
558+
startHeartbeat()
559+
}
550560
}
551561

552562
case <-observeChan:
@@ -565,6 +575,10 @@ func (i *Lifecycler) loop(ctx context.Context) error {
565575
if err != nil {
566576
level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err)
567577
}
578+
579+
if !addedInRing {
580+
startHeartbeat()
581+
}
568582
} else {
569583
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
570584
// keep observing
@@ -653,12 +667,13 @@ heartbeatLoop:
653667
// initRing is the first thing we do when we start. It:
654668
// - add an ingester entry to the ring
655669
// - copies out our state and tokens if they exist
656-
func (i *Lifecycler) initRing(ctx context.Context) error {
670+
func (i *Lifecycler) initRing(ctx context.Context) (bool, error) {
657671
var (
658672
ringDesc *Desc
659673
tokensFromFile Tokens
660674
err error
661675
)
676+
addedInRing := true
662677

663678
if i.cfg.TokensFilePath != "" {
664679
tokenFile, err := i.loadTokenFile()
@@ -692,10 +707,15 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
692707
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
693708
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
694709
i.setState(i.getPreviousState())
710+
state := i.GetState()
711+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, state, registeredAt)
712+
level.Info(i.logger).Log("msg", "auto join on startup, adding with token and state", "ring", i.RingName, "state", state)
713+
return ringDesc, true, nil
695714
}
696-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
697715
i.setTokens(tokensFromFile)
698-
return ringDesc, true, nil
716+
// Do not return ring to CAS call since instance has not been added to ring yet.
717+
addedInRing = false
718+
return nil, true, nil
699719
}
700720

701721
// Either we are a new ingester, or consul must have restarted
@@ -760,7 +780,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
760780
i.updateCounters(ringDesc)
761781
}
762782

763-
return err
783+
return addedInRing, err
764784
}
765785

766786
func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) {
@@ -875,7 +895,7 @@ func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
875895
}
876896

877897
// autoJoin selects random tokens & moves state to targetState
878-
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error {
898+
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState, alreadyInRing bool) error {
879899
var ringDesc *Desc
880900

881901
err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
@@ -890,11 +910,16 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
890910
// At this point, we should not have any tokens, and we should be in PENDING state.
891911
// Need to make sure we didn't change the num of tokens configured
892912
myTokens, _ := ringDesc.TokensFor(i.ID)
913+
if !alreadyInRing {
914+
myTokens = i.getTokens()
915+
}
893916
needTokens := i.cfg.NumTokens - len(myTokens)
894917

895918
if needTokens == 0 && myTokens.Equals(i.getTokens()) {
896919
// Tokens have been verified. No need to change them.
897-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
920+
state := i.GetState()
921+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
922+
level.Info(i.logger).Log("msg", "auto joined with existing tokens", "ring", i.RingName, "state", state)
898923
return ringDesc, true, nil
899924
}
900925

@@ -908,7 +933,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
908933
sort.Sort(myTokens)
909934
i.setTokens(myTokens)
910935

911-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
936+
state := i.GetState()
937+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
938+
level.Info(i.logger).Log("msg", "auto joined with new tokens", "ring", i.RingName, "state", state)
912939

913940
return ringDesc, true, nil
914941
})

pkg/ring/lifecycler_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,108 @@ func TestTokenFileOnDisk(t *testing.T) {
827827
}
828828
}
829829

830+
func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
831+
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
832+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
833+
834+
var ringConfig Config
835+
flagext.DefaultValues(&ringConfig)
836+
ringConfig.KVStore.Mock = ringStore
837+
838+
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
839+
require.NoError(t, err)
840+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
841+
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
842+
843+
tokenDir := t.TempDir()
844+
845+
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
846+
lifecyclerConfig.NumTokens = 512
847+
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
848+
849+
// Start first ingester.
850+
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
851+
require.NoError(t, err)
852+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))
853+
854+
// First ingester joins the ring
855+
l1.Join()
856+
857+
// Check this ingester joined, is active, and has 512 token.
858+
var expTokens []uint32
859+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
860+
d, err := r.KVClient.Get(context.Background(), ringKey)
861+
require.NoError(t, err)
862+
863+
desc, ok := d.(*Desc)
864+
if ok {
865+
expTokens = desc.Ingesters["ing1"].Tokens
866+
}
867+
return ok &&
868+
len(desc.Ingesters) == 1 &&
869+
desc.Ingesters["ing1"].State == ACTIVE &&
870+
len(desc.Ingesters["ing1"].Tokens) == 512
871+
})
872+
873+
// Change state from ACTIVE to READONLY
874+
err = l1.ChangeState(context.Background(), READONLY)
875+
require.NoError(t, err)
876+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
877+
d, err := r.KVClient.Get(context.Background(), ringKey)
878+
require.NoError(t, err)
879+
880+
desc, ok := d.(*Desc)
881+
return ok &&
882+
desc.Ingesters["ing1"].State == READONLY
883+
})
884+
885+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))
886+
887+
// Start new ingester at same token directory.
888+
lifecyclerConfig.ID = "ing2"
889+
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
890+
require.NoError(t, err)
891+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
892+
defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck
893+
894+
// Check this ingester should not in the ring before calling Join
895+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
896+
d, err := r.KVClient.Get(context.Background(), ringKey)
897+
require.NoError(t, err)
898+
desc, ok := d.(*Desc)
899+
if ok {
900+
_, ingesterInRing := desc.Ingesters["ing2"]
901+
return !ingesterInRing
902+
}
903+
return ok
904+
})
905+
906+
// New ingester joins the ring
907+
l2.Join()
908+
909+
// Check this ingester joined, is in readonly state, and has 512 token.
910+
var actTokens []uint32
911+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
912+
d, err := r.KVClient.Get(context.Background(), ringKey)
913+
require.NoError(t, err)
914+
desc, ok := d.(*Desc)
915+
if ok {
916+
actTokens = desc.Ingesters["ing2"].Tokens
917+
}
918+
return ok &&
919+
len(desc.Ingesters) == 1 &&
920+
desc.Ingesters["ing2"].State == READONLY &&
921+
len(desc.Ingesters["ing2"].Tokens) == 512
922+
})
923+
924+
// Check for same tokens.
925+
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
926+
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
927+
for i := 0; i < 512; i++ {
928+
require.Equal(t, expTokens, actTokens)
929+
}
930+
}
931+
830932
// JoinInLeavingState ensures that if the lifecycler starts up and the ring already has it in a LEAVING state that it still is able to auto join
831933
func TestJoinInLeavingState(t *testing.T) {
832934
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)

0 commit comments

Comments
 (0)