Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ const hashSlots = 16384
func hashKey(key string) string {
if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
key = key[s+1 : s+e+1]
return key[s+1 : s+e+1]
}
}
return key
Expand Down
28 changes: 10 additions & 18 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,19 @@ func (ring *Ring) getClient(key string) (*Client, error) {
return nil, errClosed
}

name := ring.hash.Get(key)
name := ring.hash.Get(hashKey(key))
if name == "" {
ring.mx.RUnlock()
return nil, errRingShardsDown
}

if shard, ok := ring.shards[name]; ok {
ring.mx.RUnlock()
return shard.Client, nil
}

cl := ring.shards[name].Client
ring.mx.RUnlock()
return nil, errRingShardsDown
return cl, nil
}

func (ring *Ring) process(cmd Cmder) {
cl, err := ring.getClient(hashKey(cmd.clusterKey()))
cl, err := ring.getClient(cmd.clusterKey())
if err != nil {
cmd.setErr(err)
return
Expand Down Expand Up @@ -299,23 +295,19 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {

cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
name := pipe.ring.hash.Get(cmd.clusterKey())
name := pipe.ring.hash.Get(hashKey(cmd.clusterKey()))
if name == "" {
cmd.setErr(errRingShardsDown)
continue
}
cmdsMap[name] = append(cmdsMap[name], cmd)
}

for i := 0; i <= pipe.ring.opt.MaxRetries; i++ {
failedCmdsMap := make(map[string][]Cmder)

for name, cmds := range cmdsMap {
client, err := pipe.ring.getClient(name)
if err != nil {
setCmdsErr(cmds, err)
if retErr == nil {
retErr = err
}
continue
}

client := pipe.ring.shards[name].Client
cn, err := client.conn()
if err != nil {
setCmdsErr(cmds, err)
Expand Down
48 changes: 42 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis_test

import (
"crypto/rand"
"fmt"
"time"

Expand All @@ -23,8 +24,8 @@ var _ = Describe("Redis ring", func() {
BeforeEach(func() {
ring = redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShard1": ":" + ringShard1Port,
"ringShard2": ":" + ringShard2Port,
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
},
})

Expand Down Expand Up @@ -82,6 +83,16 @@ var _ = Describe("Redis ring", func() {
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
})

It("supports hash tags", func() {
for i := 0; i < 100; i++ {
err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}

Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
})

Describe("pipelining", func() {
It("uses both shards", func() {
pipe := ring.Pipeline()
Expand All @@ -104,16 +115,41 @@ var _ = Describe("Redis ring", func() {
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
})

It("is consistent", func() {
It("is consistent with ring", func() {
var keys []string
for i := 0; i < 100; i++ {
key := make([]byte, 64)
_, err := rand.Read(key)
Expect(err).NotTo(HaveOccurred())
keys = append(keys, string(key))
}

_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
pipe.Set("mykey", "pipeline", 0)
for _, key := range keys {
pipe.Set(key, "value", 0).Err()
}
return nil
})
Expect(err).NotTo(HaveOccurred())

val, err := ring.Get("mykey").Result()
for _, key := range keys {
val, err := ring.Get(key).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("value"))
}
})

It("supports hash tags", func() {
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
for i := 0; i < 100; i++ {
pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
}
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("pipeline"))

Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
})
})
})