Skip to content

Commit b3b0d86

Browse files
rfrattotomwilkie
authored andcommitted
Support consistent hashing in cache.NewMemcachedClient (#1554)
* chunk/cache: support consistent hashing in cache.NewMemcachedClient cache.MemcachedClientConfig has been updated with a new boolean variable ConsistentHash, available as consistent_hash in yaml and memcached.consistent-hash as a flag. When ConsistentHash is true, the MemcachedClient will use the newly created cache.MemcachedJumpHashSelector for server distribution. Jump hash is a consistent hashing algorithm that given a key and a number of buckets, returns a bucket number in the range [0, numBuckets). Adding or removing a bucket only results in 1/N keys being moved. A downside to using jump hash is that buckets can not be arbitrarily removed from the list; it effectively acts as a stack and only supports adding or removing buckets from the end. Therefore, jump hash is most effective when the servers are ordered and where the order is predicable. A good example of this is Kubernete's StatefulSet with a headless service. DNS names will be in the form memcached-[pod number], where the pod number will grow and shrink in the way that numBuckets does. There will never be a gap in the servers when scaling up or down. Signed-off-by: Robert Fratto <robert.fratto@grafana.com> * chunk/cache: Add natural sort in jump hash server selector Signed-off-by: Robert Fratto <robert.fratto@grafana.com> * chunk/cache: add test to demonstrate natsort works TestNatSort has been added to validate that the natsort package works as expected when sorting a list of servers that are returned from SRV lookups. The example used corresponds to SRV records that would be returned for a k8s headless service backed by a StatefulSet. Signed-off-by: Robert Fratto <robert.fratto@grafana.com> * Add changelog entry. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
1 parent f578dc8 commit b3b0d86

File tree

10 files changed

+439
-4
lines changed

10 files changed

+439
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## master / unreleased
22

3+
* [FEATURE] Add option to use jump hashing to load balance requests to memcached #1554
4+
35
## 0.1.0 / 2019-08-07
46

57
* [CHANGE] HA Tracker flags were renamed to provide more clarity #1465

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ require (
1515
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
1616
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d
1717
github.com/cenkalti/backoff v1.0.0 // indirect
18+
github.com/cespare/xxhash v1.1.0
1819
github.com/coreos/go-semver v0.3.0 // indirect
1920
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
2021
github.com/cznic/ql v1.2.0 // indirect
22+
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
2123
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
2224
github.com/fsouza/fake-gcs-server v1.3.0
2325
github.com/go-kit/kit v0.8.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo
129129
github.com/evanphx/json-patch v4.1.0+incompatible h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=
130130
github.com/evanphx/json-patch v4.1.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
131131
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
132+
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM=
133+
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc=
132134
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
133135
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
134136
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=

pkg/chunk/cache/memcached_client.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@ type MemcachedClient interface {
1919
Set(item *memcache.Item) error
2020
}
2121

22+
type serverSelector interface {
23+
memcache.ServerSelector
24+
SetServers(servers ...string) error
25+
}
26+
2227
// memcachedClient is a memcache client that gets its server list from SRV
2328
// records, and periodically updates that ServerList.
2429
type memcachedClient struct {
2530
*memcache.Client
26-
serverList *memcache.ServerList
31+
serverList serverSelector
2732
hostname string
2833
service string
2934

@@ -38,6 +43,7 @@ type MemcachedClientConfig struct {
3843
Timeout time.Duration `yaml:"timeout,omitempty"`
3944
MaxIdleConns int `yaml:"max_idle_conns,omitempty"`
4045
UpdateInterval time.Duration `yaml:"update_interval,omitempty"`
46+
ConsistentHash bool `yaml:"consistent_hash,omitempty"`
4147
}
4248

4349
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
@@ -47,19 +53,26 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description st
4753
f.IntVar(&cfg.MaxIdleConns, prefix+"memcached.max-idle-conns", 16, description+"Maximum number of idle connections in pool.")
4854
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.")
4955
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, description+"Period with which to poll DNS for memcache servers.")
56+
f.BoolVar(&cfg.ConsistentHash, prefix+"memcached.consistent-hash", false, description+"Use consistent hashing to distribute to memcache servers.")
5057
}
5158

5259
// NewMemcachedClient creates a new MemcacheClient that gets its server list
5360
// from SRV and updates the server list on a regular basis.
5461
func NewMemcachedClient(cfg MemcachedClientConfig) MemcachedClient {
55-
var servers memcache.ServerList
56-
client := memcache.NewFromSelector(&servers)
62+
var selector serverSelector
63+
if cfg.ConsistentHash {
64+
selector = &MemcachedJumpHashSelector{}
65+
} else {
66+
selector = &memcache.ServerList{}
67+
}
68+
69+
client := memcache.NewFromSelector(selector)
5770
client.Timeout = cfg.Timeout
5871
client.MaxIdleConns = cfg.MaxIdleConns
5972

6073
newClient := &memcachedClient{
6174
Client: client,
62-
serverList: &servers,
75+
serverList: selector,
6376
hostname: cfg.Host,
6477
service: cfg.Service,
6578
quit: make(chan struct{}),
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package cache
2+
3+
import (
4+
"net"
5+
"strings"
6+
"sync"
7+
8+
"github.com/bradfitz/gomemcache/memcache"
9+
"github.com/cespare/xxhash"
10+
"github.com/facette/natsort"
11+
)
12+
13+
// MemcachedJumpHashSelector implements the memcache.ServerSelector
14+
// interface. MemcachedJumpHashSelector utilizes a jump hash to
15+
// distribute keys to servers.
16+
//
17+
// While adding or removing servers only requires 1/N keys to move,
18+
// servers are treated as a stack and can only be pushed/popped.
19+
// Therefore, MemcachedJumpHashSelector works best for servers
20+
// with consistent DNS names where the naturally sorted order
21+
// is predictable.
22+
type MemcachedJumpHashSelector struct {
23+
mu sync.RWMutex
24+
addrs []net.Addr
25+
}
26+
27+
// staticAddr caches the Network() and String() values from
28+
// any net.Addr.
29+
//
30+
// Copied from github.com/bradfitz/gomemcache/selector.go.
31+
type staticAddr struct {
32+
network, str string
33+
}
34+
35+
func newStaticAddr(a net.Addr) net.Addr {
36+
return &staticAddr{
37+
network: a.Network(),
38+
str: a.String(),
39+
}
40+
}
41+
42+
func (a *staticAddr) Network() string { return a.network }
43+
func (a *staticAddr) String() string { return a.str }
44+
45+
// SetServers changes a MemcachedJumpHashSelector's set of servers at
46+
// runtime and is safe for concurrent use by multiple goroutines.
47+
//
48+
// Each server is given equal weight. A server is given more weight
49+
// if it's listed multiple times.
50+
//
51+
// SetServers returns an error if any of the server names fail to
52+
// resolve. No attempt is made to connect to the server. If any
53+
// error occurs, no changes are made to the internal server list.
54+
//
55+
// To minimize the number of rehashes for keys when scaling the
56+
// number of servers in subsequent calls to SetServers, servers
57+
// are stored in natural sort order.
58+
func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error {
59+
sortedServers := make([]string, len(servers))
60+
copy(sortedServers, servers)
61+
natsort.Sort(sortedServers)
62+
63+
naddrs := make([]net.Addr, len(sortedServers))
64+
for i, server := range sortedServers {
65+
if strings.Contains(server, "/") {
66+
addr, err := net.ResolveUnixAddr("unix", server)
67+
if err != nil {
68+
return err
69+
}
70+
naddrs[i] = newStaticAddr(addr)
71+
} else {
72+
tcpAddr, err := net.ResolveTCPAddr("tcp", server)
73+
if err != nil {
74+
return err
75+
}
76+
naddrs[i] = newStaticAddr(tcpAddr)
77+
}
78+
}
79+
80+
s.mu.Lock()
81+
defer s.mu.Unlock()
82+
s.addrs = naddrs
83+
return nil
84+
}
85+
86+
// jumpHash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key.
87+
// numBuckets must be >= 1.
88+
//
89+
// Copied from github.com/dgryski/go-jump/blob/master/jump.go
90+
func jumpHash(key uint64, numBuckets int) int32 {
91+
92+
var b int64 = -1
93+
var j int64
94+
95+
for j < int64(numBuckets) {
96+
b = j
97+
key = key*2862933555777941757 + 1
98+
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
99+
}
100+
101+
return int32(b)
102+
}
103+
104+
// PickServer returns the server address that a given item
105+
// should be shared onto.
106+
func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) {
107+
s.mu.RLock()
108+
defer s.mu.RUnlock()
109+
if len(s.addrs) == 0 {
110+
return nil, memcache.ErrNoServers
111+
} else if len(s.addrs) == 1 {
112+
return s.addrs[0], nil
113+
}
114+
cs := xxhash.Sum64String(key)
115+
idx := jumpHash(cs, len(s.addrs))
116+
return s.addrs[idx], nil
117+
}
118+
119+
// Each iterates over each server and calls the given function.
120+
// If f returns a non-nil error, iteration will stop and that
121+
// error will be returned.
122+
func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error {
123+
s.mu.RLock()
124+
defer s.mu.RUnlock()
125+
for _, def := range s.addrs {
126+
if err := f(def); err != nil {
127+
return err
128+
}
129+
}
130+
return nil
131+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package cache_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/bradfitz/gomemcache/memcache"
8+
"github.com/cortexproject/cortex/pkg/chunk/cache"
9+
"github.com/facette/natsort"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestNatSort(t *testing.T) {
14+
// Validate that the order of SRV records returned by a DNS
15+
// lookup for a k8s StatefulSet are ordered as expected when
16+
// a natsort is done.
17+
input := []string{
18+
"memcached-10.memcached.cortex.svc.cluster.local.",
19+
"memcached-1.memcached.cortex.svc.cluster.local.",
20+
"memcached-6.memcached.cortex.svc.cluster.local.",
21+
"memcached-3.memcached.cortex.svc.cluster.local.",
22+
"memcached-25.memcached.cortex.svc.cluster.local.",
23+
}
24+
25+
expected := []string{
26+
"memcached-1.memcached.cortex.svc.cluster.local.",
27+
"memcached-3.memcached.cortex.svc.cluster.local.",
28+
"memcached-6.memcached.cortex.svc.cluster.local.",
29+
"memcached-10.memcached.cortex.svc.cluster.local.",
30+
"memcached-25.memcached.cortex.svc.cluster.local.",
31+
}
32+
33+
natsort.Sort(input)
34+
require.Equal(t, expected, input)
35+
}
36+
37+
func TestMemcachedJumpHashSelector_PickSever(t *testing.T) {
38+
s := cache.MemcachedJumpHashSelector{}
39+
err := s.SetServers("google.com:80", "microsoft.com:80", "duckduckgo.com:80")
40+
require.NoError(t, err)
41+
42+
// We store the string representation instead of the net.Addr
43+
// to make sure different IPs were discovered during SetServers
44+
distribution := make(map[string]int)
45+
46+
for i := 0; i < 100; i++ {
47+
key := fmt.Sprintf("key-%d", i)
48+
addr, err := s.PickServer(key)
49+
require.NoError(t, err)
50+
distribution[addr.String()]++
51+
}
52+
53+
// All of the servers should have been returned at least
54+
// once
55+
require.Len(t, distribution, 3)
56+
for _, v := range distribution {
57+
require.NotZero(t, v)
58+
}
59+
}
60+
61+
func TestMemcachedJumpHashSelector_PickSever_ErrNoServers(t *testing.T) {
62+
s := cache.MemcachedJumpHashSelector{}
63+
_, err := s.PickServer("foo")
64+
require.Error(t, memcache.ErrNoServers, err)
65+
}

vendor/github.com/facette/natsort/LICENSE

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)