Skip to content

Commit 41d37d8

Browse files
author
Ganesh Vernekar
committed
Fix Peter's comments
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
1 parent 7a16142 commit 41d37d8

File tree

3 files changed

+41
-49
lines changed

3 files changed

+41
-49
lines changed

pkg/ring/lifecycler.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,10 @@ func (i *Lifecycler) setTokens(tokens Tokens) {
261261
defer i.stateMtx.Unlock()
262262

263263
i.tokens = tokens
264-
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
265-
level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
264+
if i.cfg.TokensFilePath != "" {
265+
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
266+
level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
267+
}
266268
}
267269
}
268270

@@ -451,9 +453,22 @@ heartbeatLoop:
451453
// - add an ingester entry to the ring
452454
// - copies out our state and tokens if they exist
453455
func (i *Lifecycler) initRing(ctx context.Context) error {
454-
var ringDesc *Desc
456+
var (
457+
ringDesc *Desc
458+
tokensFromFile Tokens
459+
err error
460+
)
461+
462+
if i.cfg.TokensFilePath != "" {
463+
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
464+
if err != nil {
465+
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
466+
}
467+
} else {
468+
level.Warn(util.Logger).Log("msg", "not loading tokens from file, tokens file path is empty")
469+
}
455470

456-
err := i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
471+
err = i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
457472
if in == nil {
458473
ringDesc = NewDesc()
459474
} else {
@@ -462,18 +477,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
462477

463478
ingesterDesc, ok := ringDesc.Ingesters[i.ID]
464479
if !ok {
465-
var tokens Tokens
466-
// We load the tokens from the file only if it does not exist in the ring yet.
467-
err := tokens.LoadFromFile(i.cfg.TokensFilePath)
468-
if err != nil {
469-
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
470-
} else if len(tokens) > 0 {
471-
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokens))
472-
if len(tokens) == i.cfg.NumTokens {
480+
// We use the tokens from the file only if it does not exist in the ring yet.
481+
if len(tokensFromFile) > 0 {
482+
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
483+
if len(tokensFromFile) == i.cfg.NumTokens {
473484
i.setState(ACTIVE)
474485
}
475-
ringDesc.AddIngester(i.ID, i.Addr, tokens, i.GetState(), i.cfg.NormaliseTokens)
476-
i.setTokens(tokens)
486+
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
487+
i.setTokens(tokensFromFile)
477488
return ringDesc, true, nil
478489
}
479490

pkg/ring/lifecycler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func TestTokensOnDisk(t *testing.T) {
289289

290290
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
291291
lifecyclerConfig.NumTokens = 512
292-
lifecyclerConfig.TokensFilePath = tokenDir
292+
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
293293
lifecyclerConfig.NormaliseTokens = true
294294

295295
// Start first ingester.

pkg/ring/tokens.go

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,6 @@ import (
55
"errors"
66
"io/ioutil"
77
"os"
8-
9-
"github.com/cortexproject/cortex/pkg/util"
10-
"github.com/go-kit/kit/log/level"
11-
"github.com/prometheus/prometheus/tsdb/fileutil"
12-
)
13-
14-
const (
15-
// TokensVersion1 is the version is a simple list of tokens.
16-
TokensVersion1 = 1
178
)
189

1910
// Tokens is a simple list of tokens.
@@ -25,6 +16,8 @@ func (t Tokens) StoreToFile(tokenFilePath string) error {
2516
return errors.New("path is empty")
2617
}
2718

19+
// If any operations failed further in the function, we keep the temporary
20+
// file hanging around for debugging.
2821
f, err := ioutil.TempFile(os.TempDir(), "tokens")
2922
if err != nil {
3023
return err
@@ -35,10 +28,6 @@ func (t Tokens) StoreToFile(tokenFilePath string) error {
3528
// the error (if any) from f.Close(). If the file was already closed, then
3629
// we would ignore the error in that case too.
3730
f.Close()
38-
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
39-
if err := os.RemoveAll(f.Name()); err != nil {
40-
level.Warn(util.Logger).Log("msg", "error deleting temporary file", "err", err)
41-
}
4231
}()
4332

4433
b, err := t.Marshal()
@@ -53,29 +42,27 @@ func (t Tokens) StoreToFile(tokenFilePath string) error {
5342
return err
5443
}
5544

56-
// Block successfully written, make visible and remove old ones.
57-
return fileutil.Replace(f.Name(), tokenFilePath)
45+
// Tokens successfully written, replace the temporary file with the actual file path.
46+
return os.Rename(f.Name(), tokenFilePath)
5847
}
5948

60-
// LoadFromFile loads tokens from given directory.
61-
func (t *Tokens) LoadFromFile(tokenFilePath string) error {
49+
// LoadTokensFromFile loads tokens from given file path.
50+
func LoadTokensFromFile(tokenFilePath string) (Tokens, error) {
6251
b, err := ioutil.ReadFile(tokenFilePath)
6352
if err != nil {
6453
if os.IsNotExist(err) {
65-
return nil
54+
return nil, nil
6655
}
67-
return err
56+
return nil, err
6857
}
69-
return t.Unmarshal(b)
58+
var t Tokens
59+
err = t.Unmarshal(b)
60+
return t, err
7061
}
7162

7263
// Marshal encodes the tokens into JSON.
7364
func (t Tokens) Marshal() ([]byte, error) {
74-
data := tokensJSON{
75-
Version: TokensVersion1,
76-
Tokens: t,
77-
}
78-
return json.Marshal(data)
65+
return json.Marshal(tokensJSON{Tokens: t})
7966
}
8067

8168
// Unmarshal reads the tokens from JSON byte stream.
@@ -84,16 +71,10 @@ func (t *Tokens) Unmarshal(b []byte) error {
8471
if err := json.Unmarshal(b, &tj); err != nil {
8572
return err
8673
}
87-
switch tj.Version {
88-
case TokensVersion1:
89-
*t = Tokens(tj.Tokens)
90-
return nil
91-
default:
92-
return errors.New("invalid token version")
93-
}
74+
*t = Tokens(tj.Tokens)
75+
return nil
9476
}
9577

9678
type tokensJSON struct {
97-
Version int `json:"version"`
98-
Tokens []uint32 `json:"tokens"`
79+
Tokens []uint32 `json:"tokens"`
9980
}

0 commit comments

Comments
 (0)