Skip to content

Commit 7adfdca

Browse files
ring: fix SetAddrs and rebalance race
The change ensures atomic update of `c.hash` and `c.shards`. `BenchmarkRingRebalanceLocked` shows rebalance latency: ``` go test . -run=NONE -bench=BenchmarkRingRebalanceLocked -v -count=10 | benchstat /dev/stdin name time/op RingRebalanceLocked-8 8.50µs ±14% ``` (Note: it essentially reverts a46b053)
1 parent e223a88 commit 7adfdca

File tree

3 files changed

+37
-19
lines changed

3 files changed

+37
-19
lines changed

export_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,7 @@ func (c *Ring) ShardByName(name string) *ringShard {
102102
func (c *Ring) ShardByKey(key string) (*ringShard, error) {
103103
return c.sharding.GetByKey(key)
104104
}
105+
106+
func (c *Ring) RebalanceLocked() {
107+
c.sharding.rebalanceLocked()
108+
}

ring.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,9 @@ func (c *ringSharding) SetAddrs(addrs map[string]string) {
254254

255255
shards, cleanup := c.newRingShards(addrs, c.shards)
256256
c.shards = shards
257+
c.rebalanceLocked()
257258
c.mu.Unlock()
258259

259-
c.rebalance()
260260
cleanup()
261261
}
262262

@@ -388,40 +388,36 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
388388
}
389389

390390
if rebalance {
391-
c.rebalance()
391+
c.mu.Lock()
392+
c.rebalanceLocked()
393+
c.mu.Unlock()
392394
}
393395
case <-ctx.Done():
394396
return
395397
}
396398
}
397399
}
398400

399-
// rebalance removes dead shards from the Ring.
400-
func (c *ringSharding) rebalance() {
401-
c.mu.RLock()
402-
shards := c.shards
403-
c.mu.RUnlock()
404-
405-
if shards == nil {
401+
// rebalanceLocked removes dead shards from the Ring.
402+
// Requires c.mu locked.
403+
func (c *ringSharding) rebalanceLocked() {
404+
if c.closed {
405+
return
406+
}
407+
if c.shards == nil {
406408
return
407409
}
408410

409-
liveShards := make([]string, 0, len(shards.m))
411+
liveShards := make([]string, 0, len(c.shards.m))
410412

411-
for name, shard := range shards.m {
413+
for name, shard := range c.shards.m {
412414
if shard.IsUp() {
413415
liveShards = append(liveShards, name)
414416
}
415417
}
416418

417-
hash := c.opt.NewConsistentHash(liveShards)
418-
419-
c.mu.Lock()
420-
if !c.closed {
421-
c.hash = hash
422-
c.numShard = len(liveShards)
423-
}
424-
c.mu.Unlock()
419+
c.hash = c.opt.NewConsistentHash(liveShards)
420+
c.numShard = len(liveShards)
425421
}
426422

427423
func (c *ringSharding) Len() int {

ring_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,3 +808,21 @@ func TestRingSetAddrsAndRebalanceRace(t *testing.T) {
808808
}
809809
}
810810
}
811+
812+
func BenchmarkRingRebalanceLocked(b *testing.B) {
813+
opts := &redis.RingOptions{
814+
Addrs: make(map[string]string),
815+
// Disable heartbeat
816+
HeartbeatFrequency: 1 * time.Hour,
817+
}
818+
for i := 0; i < 100; i++ {
819+
opts.Addrs[fmt.Sprintf("shard%d", i)] = fmt.Sprintf(":63%02d", i)
820+
}
821+
822+
ring := redis.NewRing(opts)
823+
824+
b.ResetTimer()
825+
for i := 0; i < b.N; i++ {
826+
ring.RebalanceLocked()
827+
}
828+
}

0 commit comments

Comments
 (0)