Skip to content

Commit f51d290

Browse files
committed
feat: ring.SetAddrs to add and remove shards by the ring client and reuse old connections if possible
test: ring scale-in and scale-out rewrite as suggested by @AlexanderYastrebov Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
1 parent 4ddd7d1 commit f51d290

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

export_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,15 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []
9393
}
9494
return parseReplicaAddrs(addrs, false)
9595
}
96+
97+
func (c *Ring) GetAddr(addr string) *ringShard {
98+
return c.shards.GetAddr(addr)
99+
}
100+
101+
func (c *Ring) SetAddrs(addrs map[string]string) {
102+
c.shards.SetAddrs(addrs)
103+
}
104+
105+
func (c *ringShards) GetAddr(addr string) *ringShard {
106+
return c.shards[addr]
107+
}

ring.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func (opt *RingOptions) clientOptions() *Options {
160160
type ringShard struct {
161161
Client *Client
162162
down int32
163+
addr string
163164
}
164165

165166
func newRingShard(opt *RingOptions, name, addr string) *ringShard {
@@ -168,6 +169,7 @@ func newRingShard(opt *RingOptions, name, addr string) *ringShard {
168169

169170
return &ringShard{
170171
Client: opt.NewClient(name, clopt),
172+
addr: addr,
171173
}
172174
}
173175

@@ -212,33 +214,68 @@ type ringShards struct {
212214
opt *RingOptions
213215

214216
mu sync.RWMutex
217+
muClose sync.Mutex
215218
hash ConsistentHash
216-
shards map[string]*ringShard // read only
217-
list []*ringShard // read only
219+
shards map[string]*ringShard // read only, updated by SetAddrs
220+
list []*ringShard // read only, updated by SetAddrs
218221
numShard int
219222
closed bool
220223
}
221224

222225
func newRingShards(opt *RingOptions) *ringShards {
223-
shards := make(map[string]*ringShard, len(opt.Addrs))
224-
list := make([]*ringShard, 0, len(shards))
226+
c := &ringShards{
227+
opt: opt,
228+
}
229+
c.SetAddrs(opt.Addrs)
230+
231+
return c
232+
}
225233

226-
for name, addr := range opt.Addrs {
227-
shard := newRingShard(opt, name, addr)
228-
shards[name] = shard
234+
// SetAddrs replaces the shards in use, such that you can increase and
235+
// decrease number of shards, that you use. It will reuse shards that
236+
// existed before and close the ones that will not be used anymore.
237+
func (c *ringShards) SetAddrs(addrs map[string]string) {
238+
c.muClose.Lock()
239+
defer c.muClose.Unlock()
240+
if c.closed {
241+
return
242+
}
229243

244+
shards := make(map[string]*ringShard)
245+
unusedShards := make(map[string]*ringShard)
246+
247+
for k, shard := range c.shards {
248+
if addr, ok := addrs[k]; ok && shard.addr == addr {
249+
shards[k] = shard
250+
} else {
251+
unusedShards[k] = shard
252+
}
253+
}
254+
255+
for k, addr := range addrs {
256+
if shard, ok := c.shards[k]; !ok || shard.addr != addr {
257+
shards[k] = newRingShard(c.opt, k, addr)
258+
}
259+
}
260+
261+
list := make([]*ringShard, 0, len(shards))
262+
for _, shard := range shards {
230263
list = append(list, shard)
231264
}
232265

233-
c := &ringShards{
234-
opt: opt,
266+
c.mu.Lock()
267+
c.shards = shards
268+
c.list = list
269+
c.mu.Unlock()
235270

236-
shards: shards,
237-
list: list,
271+
for k, shard := range unusedShards {
272+
err := shard.Client.Close()
273+
if err != nil {
274+
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
275+
}
238276
}
239-
c.rebalance()
240277

241-
return c
278+
c.rebalance()
242279
}
243280

244281
func (c *ringShards) List() []*ringShard {
@@ -363,6 +400,8 @@ func (c *ringShards) Len() int {
363400
}
364401

365402
func (c *ringShards) Close() error {
403+
c.muClose.Lock()
404+
defer c.muClose.Unlock()
366405
c.mu.Lock()
367406
defer c.mu.Unlock()
368407

ring_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,72 @@ var _ = Describe("Redis Ring", func() {
113113
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
114114
})
115115

116+
Describe("[new] dynamic setting ring shards", func() {
117+
It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
118+
Expect(ring.Len(), 2)
119+
120+
wantShard := ring.GetAddr("ringShardOne")
121+
ring.SetAddrs(map[string]string{
122+
"ringShardOne": ":" + ringShard1Port,
123+
})
124+
Expect(ring.Len(), 1)
125+
gotShard := ring.GetAddr("ringShardOne")
126+
Expect(gotShard).To(Equal(wantShard))
127+
128+
ring.SetAddrs(map[string]string{
129+
"ringShardOne": ":" + ringShard1Port,
130+
"ringShardTwo": ":" + ringShard2Port,
131+
})
132+
Expect(ring.Len(), 2)
133+
gotShard = ring.GetAddr("ringShardOne")
134+
Expect(gotShard).To(Equal(wantShard))
135+
136+
})
137+
138+
It("uses 3 shards after setting it to 3 shards", func() {
139+
Expect(ring.Len(), 2)
140+
141+
// Start ringShard3.
142+
var err error
143+
ringShard3, err = startRedis(ringShard3Port)
144+
Expect(err).NotTo(HaveOccurred())
145+
146+
shardName1 := "ringShardOne"
147+
shardAddr1 := ":" + ringShard1Port
148+
wantShard1 := ring.GetAddr(shardName1)
149+
shardName2 := "ringShardTwo"
150+
shardAddr2 := ":" + ringShard2Port
151+
wantShard2 := ring.GetAddr(shardName2)
152+
shardName3 := "ringShardThree"
153+
shardAddr3 := ":" + ringShard3Port
154+
155+
ring.SetAddrs(map[string]string{
156+
shardName1: shardAddr1,
157+
shardName2: shardAddr2,
158+
shardName3: shardAddr3,
159+
})
160+
Expect(ring.Len(), 3)
161+
gotShard1 := ring.GetAddr(shardName1)
162+
gotShard2 := ring.GetAddr(shardName2)
163+
gotShard3 := ring.GetAddr(shardName3)
164+
Expect(gotShard1).To(Equal(wantShard1))
165+
Expect(gotShard2).To(Equal(wantShard2))
166+
Expect(gotShard3).ToNot(BeNil())
167+
168+
ring.SetAddrs(map[string]string{
169+
shardName1: shardAddr1,
170+
shardName2: shardAddr2,
171+
})
172+
Expect(ring.Len(), 2)
173+
gotShard1 = ring.GetAddr(shardName1)
174+
gotShard2 = ring.GetAddr(shardName2)
175+
gotShard3 = ring.GetAddr(shardName3)
176+
Expect(gotShard1).To(Equal(wantShard1))
177+
Expect(gotShard2).To(Equal(wantShard2))
178+
Expect(gotShard3).To(BeNil())
179+
})
180+
181+
})
116182
Describe("pipeline", func() {
117183
It("distributes keys", func() {
118184
pipe := ring.Pipeline()

0 commit comments

Comments
 (0)