Skip to content

Commit c15765f

Browse files
committed
Reload slots in background goroutine.
1 parent 593f01f commit c15765f

File tree

4 files changed

+30
-43
lines changed

4 files changed

+30
-43
lines changed

cluster.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis
22

33
import (
4+
"log"
45
"math/rand"
56
"strings"
67
"sync"
@@ -20,7 +21,8 @@ type ClusterClient struct {
2021

2122
opt *ClusterOptions
2223

23-
_reload uint32
24+
// Reports where slots reloading is in progress.
25+
_reloading uint32
2426
}
2527

2628
// NewClusterClient initializes a new cluster-aware client using given options.
@@ -31,10 +33,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
3133
slots: make([][]string, hashSlots),
3234
clients: make(map[string]*Client),
3335
opt: opt,
34-
_reload: 1,
3536
}
3637
client.commandable.process = client.process
37-
client.reloadIfDue()
38+
client.reloadSlots()
3839
go client.reaper(time.NewTicker(5 * time.Minute))
3940
return client
4041
}
@@ -102,8 +103,6 @@ func (c *ClusterClient) randomClient() (client *Client, err error) {
102103
func (c *ClusterClient) process(cmd Cmder) {
103104
var ask bool
104105

105-
c.reloadIfDue()
106-
107106
slot := hashSlot(cmd.clusterKey())
108107

109108
var addr string
@@ -149,7 +148,7 @@ func (c *ClusterClient) process(cmd Cmder) {
149148
moved, ask, addr = isMovedError(err)
150149
if moved || ask {
151150
if moved {
152-
c.scheduleReload()
151+
c.lazyReloadSlots()
153152
}
154153
client, err = c.getClient(addr)
155154
if err != nil {
@@ -203,29 +202,28 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
203202
c.slotsMx.Unlock()
204203
}
205204

206-
// Closes all connections and reloads slot cache, if due.
207-
func (c *ClusterClient) reloadIfDue() (err error) {
208-
if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) {
209-
return
210-
}
205+
func (c *ClusterClient) reloadSlots() {
206+
defer atomic.StoreUint32(&c._reloading, 0)
211207

212208
client, err := c.randomClient()
213209
if err != nil {
214-
return err
210+
log.Printf("redis: randomClient failed: %s", err)
211+
return
215212
}
216213

217214
slots, err := client.ClusterSlots().Result()
218215
if err != nil {
219-
return err
216+
log.Printf("redis: ClusterSlots failed: %s", err)
217+
return
220218
}
221219
c.setSlots(slots)
222-
223-
return nil
224220
}
225221

226-
// Schedules slots reload on next request.
227-
func (c *ClusterClient) scheduleReload() {
228-
atomic.StoreUint32(&c._reload, 1)
222+
func (c *ClusterClient) lazyReloadSlots() {
223+
if !atomic.CompareAndSwapUint32(&c._reloading, 0, 1) {
224+
return
225+
}
226+
go c.reloadSlots()
229227
}
230228

231229
// reaper closes idle connections to the cluster.

cluster_client_test.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
package redis
22

33
import (
4+
"fmt"
5+
46
. "github.com/onsi/ginkgo"
57
. "github.com/onsi/gomega"
68
)
79

8-
// GetSlot returns the cached slot addresses
9-
func (c *ClusterClient) GetSlot(pos int) []string {
10-
c.slotsMx.RLock()
11-
defer c.slotsMx.RUnlock()
12-
13-
return c.slots[pos]
10+
func (c *ClusterClient) SlotAddrs(slot int) []string {
11+
return c.slotAddrs(slot)
1412
}
1513

1614
// SwapSlot swaps a slot's master/slave address
1715
// for testing MOVED redirects
1816
func (c *ClusterClient) SwapSlot(pos int) []string {
1917
c.slotsMx.Lock()
2018
defer c.slotsMx.Unlock()
19+
fmt.Println(pos, c.slots[pos])
2120
c.slots[pos][0], c.slots[pos][1] = c.slots[pos][1], c.slots[pos][0]
2221
return c.slots[pos]
2322
}
@@ -49,7 +48,6 @@ var _ = Describe("ClusterClient", func() {
4948
It("should initialize", func() {
5049
Expect(subject.addrs).To(HaveLen(3))
5150
Expect(subject.slots).To(HaveLen(16384))
52-
Expect(subject._reload).To(Equal(uint32(0)))
5351
})
5452

5553
It("should update slots cache", func() {
@@ -84,11 +82,4 @@ var _ = Describe("ClusterClient", func() {
8482
Expect(subject.slots[8192]).To(BeEmpty())
8583
Expect(subject.slots[16383]).To(BeEmpty())
8684
})
87-
88-
It("should check if reload is due", func() {
89-
subject._reload = 0
90-
Expect(subject._reload).To(Equal(uint32(0)))
91-
subject.scheduleReload()
92-
Expect(subject._reload).To(Equal(uint32(1)))
93-
})
9485
})

cluster_pipeline.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (c *ClusterPipeline) execClusterCmds(
113113
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
114114
break
115115
} else if moved, ask, addr := isMovedError(err); moved {
116-
c.cluster.scheduleReload()
116+
c.cluster.lazyReloadSlots()
117117
cmd.reset()
118118
failedCmds[addr] = append(failedCmds[addr], cmd)
119119
} else if ask {

cluster_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -258,25 +258,23 @@ var _ = Describe("Cluster", func() {
258258

259259
It("should follow redirects", func() {
260260
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
261-
Expect(redis.HashSlot("A")).To(Equal(6373))
262-
Expect(client.SwapSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
261+
262+
slot := redis.HashSlot("A")
263+
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
263264

264265
val, err := client.Get("A").Result()
265266
Expect(err).NotTo(HaveOccurred())
266267
Expect(val).To(Equal("VALUE"))
267-
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
268+
Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
268269

269-
val, err = client.Get("A").Result()
270-
Expect(err).NotTo(HaveOccurred())
271-
Expect(val).To(Equal("VALUE"))
272-
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
270+
Eventually(func() []string {
271+
return client.SlotAddrs(slot)
272+
}).Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
273273
})
274274

275275
It("should perform multi-pipelines", func() {
276-
// Dummy command to load slots info.
277-
Expect(client.Ping().Err()).NotTo(HaveOccurred())
278-
279276
slot := redis.HashSlot("A")
277+
Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
280278
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
281279

282280
pipe := client.Pipeline()

0 commit comments

Comments
 (0)