Skip to content

Avoid sort tokens on autoJoin #5394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Improving Performance on the API Gzip Handler. #5347
* [ENHANCEMENT] Dynamodb: Add `puller-sync-time` to allow different pull time for ring. #5357
* [ENHANCEMENT] Emit querier `max_concurrent` as a metric. #5362
* [ENHANCEMENT] Avoid sort tokens on lifecycler autoJoin. #5394
* [ENHANCEMENT] Do not resync blocks in running store gateways during rollout deployment and container restart. #5363
* [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
Expand Down
13 changes: 11 additions & 2 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,11 +735,20 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
ringDesc = in.(*Desc)
}

i.setState(targetState)

// At this point, we should not have any tokens, and we should be in PENDING state.
// Need to make sure we didnt change the num of tokens configured
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
needTokens := i.cfg.NumTokens - len(myTokens)

newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
i.setState(targetState)
if needTokens == 0 && myTokens.Equals(i.getTokens()) {
// Tokens have been verified. No need to change them.
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
return ringDesc, true, nil
}

newTokens := GenerateTokens(needTokens, takenTokens)

myTokens = append(myTokens, newTokens...)
sort.Sort(myTokens)
Expand Down
14 changes: 1 addition & 13 deletions pkg/ruler/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ruler

import (
"context"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -76,7 +75,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)

instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now())
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", ring.GenerateTokens(config.Ring.NumTokens, nil), ring.ACTIVE, time.Now())
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
ringDesc.Ingesters[unhealthyInstanceID] = instance

Expand All @@ -95,17 +94,6 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
})
}

func generateSortedTokens(numTokens int) ring.Tokens {
tokens := ring.GenerateTokens(numTokens, nil)

// Ensure generated tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})

return ring.Tokens(tokens)
}

// numTokens determines the number of tokens owned by the specified
// address
func numTokens(c kv.Client, name, ringKey string) int {
Expand Down
24 changes: 6 additions & 18 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -114,12 +113,12 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
"instance already in the ring with ACTIVE state and has all tokens": {
initialExists: true,
initialState: ring.ACTIVE,
initialTokens: generateSortedTokens(RingNumTokens),
initialTokens: ring.GenerateTokens(RingNumTokens, nil),
},
"instance already in the ring with LEAVING state and has all tokens": {
initialExists: true,
initialState: ring.LEAVING,
initialTokens: generateSortedTokens(RingNumTokens),
initialTokens: ring.GenerateTokens(RingNumTokens, nil),
},
}

Expand Down Expand Up @@ -548,15 +547,15 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) {
expectedNumTokens int
}{
"stored tokens are less than the configured ones": {
storedTokens: generateSortedTokens(RingNumTokens - 10),
storedTokens: ring.GenerateTokens(RingNumTokens-10, nil),
expectedNumTokens: RingNumTokens,
},
"stored tokens are equal to the configured ones": {
storedTokens: generateSortedTokens(RingNumTokens),
storedTokens: ring.GenerateTokens(RingNumTokens, nil),
expectedNumTokens: RingNumTokens,
},
"stored tokens are more then the configured ones": {
storedTokens: generateSortedTokens(RingNumTokens + 10),
storedTokens: ring.GenerateTokens(RingNumTokens+10, nil),
expectedNumTokens: RingNumTokens + 10,
},
}
Expand Down Expand Up @@ -791,7 +790,7 @@ func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testin
require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)

instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(RingNumTokens), ring.ACTIVE, time.Now())
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", ring.GenerateTokens(RingNumTokens, nil), ring.ACTIVE, time.Now())
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
ringDesc.Ingesters[unhealthyInstanceID] = instance

Expand Down Expand Up @@ -1144,17 +1143,6 @@ func mockTSDB(t *testing.T, dir string, numSeries, numBlocks int, minT, maxT int
require.NoError(t, db.Close())
}

func generateSortedTokens(numTokens int) ring.Tokens {
tokens := ring.GenerateTokens(numTokens, nil)

// Ensure generated tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})

return ring.Tokens(tokens)
}

func readSamplesFromChunks(rawChunks []storepb.AggrChunk) ([]sample, error) {
var samples []sample

Expand Down