Skip to content

Optimised Ring.ShuffleShard() and disabled subring cache in store-gateway, ruler and compactor #3601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613
* [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627
* [ENHANCEMENT] Querier: Implement result caching for tenant query federation. #3640
* [ENHANCEMENT] Disabled in-memory shuffle-sharding subring cache in the store-gateway, ruler and compactor. This should reduce the memory utilisation in these services when shuffle-sharding is enabled, without introducing a significantly increase CPU utilisation. #3601
* [ENHANCEMENT] Shuffle sharding: optimised subring generation used by shuffle sharding. #3601
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603
* [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
userHash := hasher.Sum32()

// Check whether this compactor instance owns the user.
rs, err := c.ring.Get(userHash, ring.Compactor, []ring.IngesterDesc{})
rs, err := c.ring.Get(userHash, ring.Compactor, nil, nil, nil)
if err != nil {
return false, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {

// Configure lifecycler
lc.RingConfig = rc
lc.RingConfig.SubringCacheDisabled = true
lc.ListenPort = cfg.ListenPort
lc.Addr = cfg.InstanceAddr
lc.Port = cfg.InstancePort
Expand Down
2 changes: 2 additions & 0 deletions pkg/compactor/compactor_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestRingConfig_DefaultConfigToLifecyclerConfig(t *testing.T) {
// intentionally overridden
expected.ListenPort = cfg.ListenPort
expected.RingConfig.ReplicationFactor = 1
expected.RingConfig.SubringCacheDisabled = true
expected.NumTokens = 512
expected.MinReadyDuration = 0
expected.FinalSleep = 0
Expand All @@ -45,6 +46,7 @@ func TestRingConfig_CustomConfigToLifecyclerConfig(t *testing.T) {
// ring config
expected.HeartbeatPeriod = cfg.HeartbeatPeriod
expected.RingConfig.HeartbeatTimeout = cfg.HeartbeatTimeout
expected.RingConfig.SubringCacheDisabled = true
expected.ID = cfg.InstanceID
expected.InfNames = cfg.InstanceInterfaceNames
expected.Port = cfg.InstancePort
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers)

if ok && metricNameMatcher.Type == labels.MatchEqual {
return d.ingestersRing.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil)
return d.ingestersRing.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil, nil, nil)
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid

// Find the replication set of each block we need to query.
for _, blockID := range blockIDs {
// Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance).
// Do not reuse the same buffer across multiple Get() calls because we do retain the
// returned replication set.
buf := make([]ring.IngesterDesc, 0, userRing.ReplicationFactor()+2)
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, buf)
set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String())
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
itemTrackers := make([]itemTracker, len(keys))
ingesters := make(map[string]ingester, r.IngesterCount())

const maxExpectedReplicationSet = 5 // Typical replication factor 3, plus one for inactive plus one for luck.
var descs [maxExpectedReplicationSet]IngesterDesc
var (
bufDescs [GetBufferSize]IngesterDesc
bufHosts [GetBufferSize]string
bufZones [GetBufferSize]string
)
for i, key := range keys {
replicationSet, err := r.Get(key, Write, descs[:0])
replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
sort.Strings(ingesterIDs)

now := time.Now()
ingesters := []interface{}{}
_, owned := countTokens(r.ringDesc, r.ringTokens)
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting) {
if !r.IsHealthy(&ing, Reporting, now) {
state = unhealthy
}

Expand Down Expand Up @@ -178,7 +179,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ShowTokens bool `json:"-"`
}{
Ingesters: ingesters,
Now: time.Now(),
Now: now,
ShowTokens: tokensParam == "true",
}, pageTemplate, req)
}
4 changes: 3 additions & 1 deletion pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,13 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
zones := map[string]struct{}{}

if ringDesc != nil {
now := time.Now()

for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone] = struct{}{}

// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout) {
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
healthyInstancesCount++
}
}
Expand Down
161 changes: 112 additions & 49 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ring

import (
"container/heap"
"fmt"
"sort"
"time"
Expand All @@ -11,13 +12,6 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
)

// ByToken is a sortable list of TokenDescs
type ByToken []TokenDesc

func (ts ByToken) Len() int { return len(ts) }
func (ts ByToken) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
func (ts ByToken) Less(i, j int) bool { return ts[i].Token < ts[j].Token }

// ByAddr is a sortable list of IngesterDesc.
type ByAddr []IngesterDesc

Expand Down Expand Up @@ -121,16 +115,12 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
return nil
}

// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
takenTokens, myTokens := Tokens{}, Tokens{}
for _, token := range d.getTokens() {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
myTokens = append(myTokens, token.Token)
}
}
return myTokens, takenTokens
// TokensFor return all ring tokens and tokens for the input provided ID.
// Returned tokens are guaranteed to be sorted.
func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens) {
allTokens = d.GetTokens()
myTokens = d.Ingesters[id].Tokens
return
}

// GetRegisteredAt returns the timestamp when the instance has been registered to the ring
Expand All @@ -144,7 +134,7 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time {
}

// IsHealthy checks whether the ingester appears to be alive and heartbeating
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool {
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := false

switch op {
Expand All @@ -170,7 +160,7 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b
healthy = i.State == ACTIVE
}

return healthy && time.Since(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout
return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
}

// Merge merges other ring into this one. Returns sub-ring that represents the change,
Expand Down Expand Up @@ -419,46 +409,43 @@ func (d *Desc) RemoveTombstones(limit time.Time) {
}
}

type TokenDesc struct {
Token uint32
Ingester string
Zone string
}
func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
out := map[uint32]instanceInfo{}

// getTokens returns sorted list of tokens with ingester IDs, owned by each ingester in the ring.
func (d *Desc) getTokens() []TokenDesc {
numTokens := 0
for _, ing := range d.Ingesters {
numTokens += len(ing.Tokens)
}
tokens := make([]TokenDesc, 0, numTokens)
for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
for instanceID, instance := range d.Ingesters {
info := instanceInfo{
InstanceID: instanceID,
Zone: instance.Zone,
}

for _, token := range instance.Tokens {
out[token] = info
}
}

sort.Sort(ByToken(tokens))
return tokens
return out
}

// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone
// are guaranteed to be sorted.
func (d *Desc) getTokensByZone() map[string][]TokenDesc {
zones := map[string][]TokenDesc{}

for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
}
// GetTokens returns sorted list of tokens owned by all instances within the ring.
func (d *Desc) GetTokens() []uint32 {
instances := make([][]uint32, 0, len(d.Ingesters))
for _, instance := range d.Ingesters {
instances = append(instances, instance.Tokens)
}

// Ensure tokens are sorted within each zone.
for zone := range zones {
sort.Sort(ByToken(zones[zone]))
return MergeTokens(instances)
}

// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone
// are guaranteed to be sorted.
func (d *Desc) getTokensByZone() map[string][]uint32 {
zones := map[string][][]uint32{}
for _, instance := range d.Ingesters {
zones[instance.Zone] = append(zones[instance.Zone], instance.Tokens)
}

return zones
// Merge tokens per zone.
return MergeTokensByZone(zones)
}

type CompareResult int
Expand Down Expand Up @@ -539,3 +526,79 @@ func GetOrCreateRingDesc(d interface{}) *Desc {
}
return d.(*Desc)
}

// TokensHeap is an heap data structure used to merge multiple lists
// of sorted tokens into a single one.
type TokensHeap [][]uint32

func (h TokensHeap) Len() int {
return len(h)
}

func (h TokensHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h TokensHeap) Less(i, j int) bool {
return h[i][0] < h[j][0]
}

func (h *TokensHeap) Push(x interface{}) {
*h = append(*h, x.([]uint32))
}

func (h *TokensHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// MergeTokens takes in input multiple lists of tokens and returns a single list
// containing all tokens merged and sorted. Each input single list is required
// to have tokens already sorted.
func MergeTokens(instances [][]uint32) []uint32 {
numTokens := 0

// Build the heap.
h := make(TokensHeap, 0, len(instances))
for _, tokens := range instances {
if len(tokens) == 0 {
continue
}

// We can safely append the input slice because elements inside are never shuffled.
h = append(h, tokens)
numTokens += len(tokens)
}
heap.Init(&h)

out := make([]uint32, 0, numTokens)

for h.Len() > 0 {
// The minimum element in the tree is the root, at index 0.
lowest := h[0]
out = append(out, lowest[0])

if len(lowest) > 1 {
// Remove the first token from the lowest because we popped it
// and then fix the heap to keep it sorted.
h[0] = h[0][1:]
heap.Fix(&h, 0)
} else {
heap.Remove(&h, 0)
}
}

return out
}

// MergeTokensByZone is like MergeTokens but does it for each input zone.
func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32 {
out := make(map[string][]uint32, len(zones))
for zone, tokens := range zones {
out[zone] = MergeTokens(tokens)
}
return out
}
Loading