-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consistent hashing #5408
Consistent hashing #5408
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,14 +6,31 @@ package receive | |
import ( | ||
"context" | ||
"fmt" | ||
"sort" | ||
"strconv" | ||
"sync" | ||
|
||
"github.com/cespare/xxhash" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/thanos-io/thanos/pkg/store/labelpb" | ||
|
||
"github.com/thanos-io/thanos/pkg/store/storepb/prompb" | ||
) | ||
|
||
// HashringAlgorithm is the algorithm used to distribute series in the ring. | ||
type HashringAlgorithm string | ||
|
||
const ( | ||
AlgorithmHashmod HashringAlgorithm = "hashmod" | ||
AlgorithmConsistent HashringAlgorithm = "consistent" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be called "ketama" instead of "consistent"? So that other consistent hashing algorithms can be added in the future without breaking backwards compatibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, we should give it a more specific name. We can change it to either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good 👍 Renamed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to agree with @douglascamata I think it would quite good to specify exactly what implementation we use. If it's based on libketama from https://dgryski.medium.com/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 - let's mentioned that in comment, name of this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
// SectionsPerNode is the number of sections in the ring assigned to each node | ||
// when using consistent hashing. A higher number yields a better series distribution, | ||
// but also comes with a higher memory cost. | ||
SectionsPerNode = 1000 | ||
) | ||
|
||
// insufficientNodesError is returned when a hashring does not | ||
// have enough nodes to satisfy a request for a node. | ||
type insufficientNodesError struct { | ||
|
@@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri | |
return string(s), nil | ||
} | ||
|
||
// simpleHashring represents a group of nodes handling write requests. | ||
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series. | ||
type simpleHashring []string | ||
|
||
// Get returns a target to handle the given tenant and time series. | ||
|
@@ -69,6 +86,81 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st | |
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil | ||
} | ||
|
||
type section struct { | ||
endpointIndex uint64 | ||
hash uint64 | ||
} | ||
|
||
type sections []section | ||
|
||
func (p sections) Len() int { return len(p) } | ||
func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash } | ||
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] } | ||
func (p sections) Sort() { sort.Sort(p) } | ||
|
||
// consistentHashring represents a group of nodes handling write requests with consistent hashing. | ||
type consistentHashring struct { | ||
endpoints []string | ||
sections sections | ||
numEndpoints uint64 | ||
} | ||
|
||
func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring { | ||
// Replication works by choosing subsequent nodes in the ring. | ||
// In order to improve consistency, we avoid relying on the ordering of the endpoints | ||
// and sort them lexicographically. | ||
sort.Strings(endpoints) | ||
|
||
numSections := len(endpoints) * sectionsPerNode | ||
ring := consistentHashring{ | ||
endpoints: endpoints, | ||
sections: make(sections, 0, numSections), | ||
numEndpoints: uint64(len(endpoints)), | ||
} | ||
|
||
hash := xxhash.New() | ||
for endpointIndex, endpoint := range endpoints { | ||
for i := 1; i <= sectionsPerNode; i++ { | ||
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i))) | ||
n := §ion{ | ||
endpointIndex: uint64(endpointIndex), | ||
hash: hash.Sum64(), | ||
} | ||
|
||
ring.sections = append(ring.sections, *n) | ||
hash.Reset() | ||
} | ||
} | ||
sort.Sort(ring.sections) | ||
return &ring | ||
} | ||
|
||
func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { | ||
return c.GetN(tenant, ts, 0) | ||
} | ||
|
||
func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { | ||
if n >= c.numEndpoints { | ||
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1} | ||
} | ||
|
||
v := labelpb.HashWithPrefix(tenant, ts.Labels) | ||
|
||
var i uint64 | ||
i = uint64(sort.Search(len(c.sections), func(i int) bool { | ||
return c.sections[i].hash >= v | ||
})) | ||
|
||
numSections := uint64(len(c.sections)) | ||
if i == numSections { | ||
i = 0 | ||
} | ||
|
||
nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints | ||
|
||
return c.endpoints[nodeIndex], nil | ||
} | ||
|
||
// multiHashring represents a set of hashrings. | ||
// Which hashring to use for a tenant is determined | ||
// by the tenants field of the hashring configuration. | ||
|
@@ -121,13 +213,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st | |
// groups. | ||
// Which hashring to use for a tenant is determined | ||
// by the tenants field of the hashring configuration. | ||
func newMultiHashring(cfg []HashringConfig) Hashring { | ||
func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring { | ||
m := &multiHashring{ | ||
cache: make(map[string]Hashring), | ||
} | ||
|
||
newHashring := func(endpoints []string) Hashring { | ||
switch algorithm { | ||
case AlgorithmHashmod: | ||
return simpleHashring(endpoints) | ||
case AlgorithmConsistent: | ||
return newConsistentHashring(endpoints, SectionsPerNode) | ||
default: | ||
return simpleHashring(endpoints) | ||
} | ||
} | ||
|
||
for _, h := range cfg { | ||
m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints)) | ||
m.hashrings = append(m.hashrings, newHashring(h.Endpoints)) | ||
var t map[string]struct{} | ||
if len(h.Tenants) != 0 { | ||
t = make(map[string]struct{}) | ||
|
@@ -147,7 +250,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring { | |
// Which hashring to use for a tenant is determined | ||
// by the tenants field of the hashring configuration. | ||
// The updates chan is closed before exiting. | ||
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { | ||
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error { | ||
defer close(updates) | ||
go cw.Run(ctx) | ||
|
||
|
@@ -157,15 +260,15 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw | |
if !ok { | ||
return errors.New("hashring config watcher stopped unexpectedly") | ||
} | ||
updates <- newMultiHashring(cfg) | ||
updates <- newMultiHashring(algorithm, cfg) | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid. | ||
func HashringFromConfig(content string) (Hashring, error) { | ||
func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) { | ||
config, err := parseConfig([]byte(content)) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "failed to parse configuration") | ||
|
@@ -176,5 +279,5 @@ func HashringFromConfig(content string) (Hashring, error) { | |
return nil, errors.Wrapf(err, "failed to load configuration") | ||
} | ||
|
||
return newMultiHashring(config), err | ||
return newMultiHashring(algorithm, config), err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of sad our tooling does not put those values in help. Do you mean explicitly adding those options for people to know how to use it?