Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve ingester state on restart #6301

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,7 @@ func removeIgnoredLogs(input []string) []string {

ignoredLogStringsRegexList := []*regexp.Regexp{
regexp.MustCompile(`^level=(info|debug|warn) component=cleaner .+$`),
regexp.MustCompile(`^level=info component=compactor msg="set state" .+$`),
}

out := make([]string, 0, len(input))
Expand Down
6 changes: 4 additions & 2 deletions pkg/ring/basic_lifecycler_delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}

tokensFromFile, err := LoadTokensFromFile(d.tokensPath)
tokenFile, err := LoadTokenFile(d.tokensPath)
if err != nil {
if !os.IsNotExist(err) {
level.Error(d.logger).Log("msg", "error loading tokens from file", "err", err)
}

return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
tokensFromFile := tokenFile.Tokens

// Signal the next delegate that the tokens have been loaded, miming the
// case the instance exist in the ring (which is OK because the lifecycler
Expand All @@ -94,7 +95,8 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife

func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) {
if d.tokensPath != "" {
if err := tokens.StoreToFile(d.tokensPath); err != nil {
tokenFile := TokenFile{Tokens: tokens}
if err := tokenFile.StoreToFile(d.tokensPath); err != nil {
level.Error(d.logger).Log("msg", "error storing tokens to disk", "path", d.tokensPath, "err", err)
}
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/ring/basic_lifecycler_delegates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,23 @@ func TestTokensPersistencyDelegate_ShouldSkipTokensLoadingIfFileDoesNotExist(t *
require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler))

// Ensure tokens have been stored.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens)

// Ensure no error has been logged.
assert.Empty(t, logs.String())
}

func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testing.T) {
func TestTokensPersistencyDelegate_ShouldLoadTokenFileIfFileExist(t *testing.T) {
tokensFile, err := os.CreateTemp("", "tokens-*")
require.NoError(t, err)
defer os.Remove(tokensFile.Name()) //nolint:errcheck

// Store some tokens to the file.
storedTokens := Tokens{6, 7, 8, 9, 10}
require.NoError(t, storedTokens.StoreToFile(tokensFile.Name()))
tokenFile1 := TokenFile{Tokens: storedTokens}
require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name()))

testDelegate := &mockDelegate{
onRegister: func(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
Expand Down Expand Up @@ -113,9 +114,9 @@ func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testin
require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler))

// Ensure we can still read back the tokens file.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, storedTokens, actualTokens)
assert.Equal(t, storedTokens, tokenFile.Tokens)
}

func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheRing(t *testing.T) {
Expand Down Expand Up @@ -150,7 +151,8 @@ func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheR
defer os.Remove(tokensFile.Name()) //nolint:errcheck

// Store some tokens to the file.
require.NoError(t, storedTokens.StoreToFile(tokensFile.Name()))
tokenFile1 := TokenFile{Tokens: storedTokens}
require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name()))

// We assume is already registered to the ring.
registeredAt := time.Now().Add(-time.Hour)
Expand Down Expand Up @@ -226,9 +228,9 @@ func TestDelegatesChain(t *testing.T) {
assert.True(t, onStoppingCalled)

// Ensure tokens have been stored.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens)
}

func TestAutoForgetDelegate(t *testing.T) {
Expand Down
76 changes: 64 additions & 12 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Lifecycler struct {
// goes away and comes back empty. The state changes during lifecycle of instance.
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
tokenFile *TokenFile
registeredAt time.Time

// Controls the ready-reporting
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewLifecycler(
actorChan: make(chan func()),
autojoinChan: make(chan struct{}, 1),
state: PENDING,
tokenFile: &TokenFile{PreviousState: ACTIVE},
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
tg: tg,
Expand Down Expand Up @@ -301,6 +302,7 @@ func (i *Lifecycler) GetState() InstanceState {
func (i *Lifecycler) setState(state InstanceState) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
level.Info(i.logger).Log("msg", "set state", "old_state", i.state, "new_state", state)
i.state = state
}

Expand Down Expand Up @@ -334,7 +336,7 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.tokens
return i.tokenFile.Tokens
}

func (i *Lifecycler) setTokens(tokens Tokens) {
Expand All @@ -343,14 +345,52 @@ func (i *Lifecycler) setTokens(tokens Tokens) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()

i.tokens = tokens
i.tokenFile.Tokens = tokens
if i.cfg.TokensFilePath != "" {
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(i.logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
}
}
}

func (i *Lifecycler) getPreviousState() InstanceState {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.tokenFile.PreviousState
}

func (i *Lifecycler) setPreviousState(state InstanceState) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()

if !(state == ACTIVE || state == READONLY) {
level.Error(i.logger).Log("msg", "cannot store unsupported state to disk", "new_state", state, "old_state", i.tokenFile.PreviousState)
return
}

i.tokenFile.PreviousState = state
if i.cfg.TokensFilePath != "" {
if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(i.logger).Log("msg", "error storing state to disk", "path", i.cfg.TokensFilePath, "err", err)
} else {
level.Info(i.logger).Log("msg", "saved state to disk", "state", state, "path", i.cfg.TokensFilePath)
}
}
}

func (i *Lifecycler) loadTokenFile() (*TokenFile, error) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()

t, err := LoadTokenFile(i.cfg.TokensFilePath)
if err != nil {
return nil, err
}
i.tokenFile = t
level.Info(i.logger).Log("msg", "loaded token file", "state", i.tokenFile.PreviousState, "num_tokens", len(i.tokenFile.Tokens), "path", i.cfg.TokensFilePath)
return i.tokenFile, nil
}

func (i *Lifecycler) getRegisteredAt() time.Time {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
Expand Down Expand Up @@ -501,8 +541,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
}
}
}
Expand All @@ -519,9 +559,9 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if i.verifyTokens(context.Background()) {
level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName)

err := i.changeState(context.Background(), ACTIVE)
err := i.changeState(context.Background(), i.getPreviousState())
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err)
level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err)
}
} else {
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
Expand Down Expand Up @@ -564,6 +604,12 @@ func (i *Lifecycler) stopping(runningError error) error {
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

// save current state into file
if i.cfg.TokensFilePath != "" {
currentState := i.GetState()
i.setPreviousState(currentState)
}

// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
Expand Down Expand Up @@ -613,9 +659,13 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
)

if i.cfg.TokensFilePath != "" {
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
tokenFile, err := i.loadTokenFile()
if err != nil && !os.IsNotExist(err) {
level.Error(i.logger).Log("msg", "error loading tokens from file", "err", err)
level.Error(i.logger).Log("msg", "error loading tokens and previous state from file", "err", err)
}

if tokenFile != nil {
tokensFromFile = tokenFile.Tokens
}
} else {
level.Info(i.logger).Log("msg", "not loading tokens from file, tokens file path is empty")
Expand All @@ -639,7 +689,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) > 0 {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
i.setState(ACTIVE)
i.setState(i.getPreviousState())
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
i.setTokens(tokensFromFile)
Expand Down Expand Up @@ -673,7 +723,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
// else set to PENDING
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) != 0 {
if i.autoJoinOnStartup {
instanceDesc.State = ACTIVE
instanceDesc.State = i.getPreviousState()
} else {
instanceDesc.State = PENDING
}
Expand Down Expand Up @@ -911,7 +961,9 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error
if !((currState == PENDING && state == JOINING) || // triggered by TransferChunks at the beginning
(currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure
(currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success
(currState == JOINING && state == READONLY) || // triggered by TransferChunks on success
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update those comments? TransferChunks is from the old chunks store.

(currState == PENDING && state == ACTIVE) || // triggered by autoJoin
(currState == PENDING && state == READONLY) || // triggered by autoJoin
(currState == ACTIVE && state == LEAVING) || // triggered by shutdown
(currState == ACTIVE && state == READONLY) || // triggered by ingester mode
(currState == READONLY && state == ACTIVE) || // triggered by ingester mode
Expand Down
16 changes: 14 additions & 2 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestTokensOnDisk(t *testing.T) {
func TestTokenFileOnDisk(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

Expand Down Expand Up @@ -756,6 +756,18 @@ func TestTokensOnDisk(t *testing.T) {
len(desc.Ingesters["ing1"].Tokens) == 512
})

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))

// Start new ingester at same token directory.
Expand All @@ -776,7 +788,7 @@ func TestTokensOnDisk(t *testing.T) {
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
desc.Ingesters["ing2"].State == READONLY &&
len(desc.Ingesters["ing2"].Tokens) == 512
})

Expand Down
79 changes: 79 additions & 0 deletions pkg/ring/token_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ring

import (
"encoding/json"
"errors"
"os"
"sort"
)

type TokenFile struct {
PreviousState InstanceState `json:"previousState"`
Tokens Tokens `json:"tokens"`
}

// StoreToFile stores the tokens in the given directory.
func (l TokenFile) StoreToFile(tokenFilePath string) error {
if tokenFilePath == "" {
return errors.New("path is empty")
}

// If any operations failed further in the function, we keep the temporary
// file hanging around for debugging.
f, err := os.Create(tokenFilePath + ".tmp")
if err != nil {
return err
}

defer func() {
// If the file was not closed, then there must already be an error, hence ignore
// the error (if any) from f.Close(). If the file was already closed, then
// we would ignore the error in that case too.
_ = f.Close()
}()

b, err := json.Marshal(l)
if err != nil {
return err
}
if _, err = f.Write(b); err != nil {
return err
}

if err := f.Close(); err != nil {
return err
}

// Tokens successfully written, replace the temporary file with the actual file path.
return os.Rename(f.Name(), tokenFilePath)
}

func LoadTokenFile(tokenFilePath string) (*TokenFile, error) {
b, err := os.ReadFile(tokenFilePath)
if err != nil {
return nil, err
}
t := TokenFile{}
err = json.Unmarshal(b, &t)

// Tokens may have been written to file by an older version which
// doesn't guarantee sorted tokens, so we enforce sorting here.
if !sort.IsSorted(t.Tokens) {
sort.Sort(t.Tokens)
}

return &t, err
}

func (p InstanceState) MarshalJSON() ([]byte, error) {
ss := InstanceState_name[int32(p)]
return json.Marshal(ss)
}
func (p *InstanceState) UnmarshalJSON(data []byte) error {
res := ""
if err := json.Unmarshal(data, &res); err != nil {
return err
}
*p = InstanceState(InstanceState_value[res])
return nil
}
Loading
Loading