Skip to content

Commit

Permalink
Merge pull request redis#630 from go-redis/fix/pipeline-retrying
Browse files Browse the repository at this point in the history
Rework pipeline retrying
  • Loading branch information
vmihailenco authored Sep 1, 2017
2 parents 0daeac9 + dbd2c99 commit 74d5147
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 254 deletions.
325 changes: 203 additions & 122 deletions cluster.go

Large diffs are not rendered by default.

71 changes: 30 additions & 41 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() {

Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))
}, 30*time.Second).Should(Equal("VALUE"))

cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() {

Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))
}, 30*time.Second).Should(Equal("VALUE"))
})

It("distributes keys", func() {
Expand All @@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
}, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
Expand All @@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
}, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
Expand Down Expand Up @@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))

if opt.RouteByLatency {
return
}

for _, key := range keys {
slot := hashtag.Slot(key)
client.SwapSlotNodes(slot)
Expand Down Expand Up @@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() {
})

AfterEach(func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -560,6 +559,9 @@ var _ = Describe("ClusterClient", func() {
})

AfterEach(func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})

Expand All @@ -575,10 +577,19 @@ var _ = Describe("ClusterClient", func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})

_ = client.ForEachSlave(func(slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
})

AfterEach(func() {
client.FlushDB()
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})

Expand All @@ -597,7 +608,7 @@ var _ = Describe("ClusterClient without nodes", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("returns an error", func() {
It("Ping returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
Expand Down Expand Up @@ -626,15 +637,15 @@ var _ = Describe("ClusterClient without valid nodes", func() {

It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
Expect(err).To(MatchError("redis: cannot load cluster slots"))
})

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
Expect(err).To(MatchError("redis: cannot load cluster slots"))
})
})

Expand Down Expand Up @@ -664,7 +675,7 @@ var _ = Describe("ClusterClient timeout", func() {
It("Tx timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
return tx.Ping().Err()
})
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
Expand All @@ -676,42 +687,20 @@ var _ = Describe("ClusterClient timeout", func() {
return nil
})
return err
})
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}

Context("read timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})

testTimeout()
})
const pause = time.Second

Context("write timeout", func() {
Context("read/write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})

testTimeout()
})

Context("ClientPause timeout", func() {
const pause = time.Second

BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = pause / 10
opt.WriteTimeout = pause / 10
opt.MaxRedirects = -1
opt.ReadTimeout = 100 * time.Millisecond
opt.WriteTimeout = 100 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.clusterClient(opt)

err := client.ForEachNode(func(client *redis.Client) error {
Expand Down
14 changes: 12 additions & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,19 @@ type Cmder interface {

func setCmdsErr(cmds []Cmder, e error) {
for _, cmd := range cmds {
cmd.setErr(e)
if cmd.Err() == nil {
cmd.setErr(e)
}
}
}

func firstCmdsErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
}
}
return nil
}

func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
Expand Down Expand Up @@ -95,7 +106,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
return 1
}
if info == nil {
internal.Logf("info for cmd=%s not found", cmd.Name())
return -1
}
return int(info.FirstKeyPos)
Expand Down
53 changes: 34 additions & 19 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ var _ = Describe("Commands", func() {
Describe("server", func() {

It("should Auth", func() {
_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Auth("password")
pipe.Auth("")
return nil
})
Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))

stats := client.Pool().Stats()
Expect(stats.Requests).To(Equal(uint32(2)))
Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
Expect(stats.TotalConns).To(Equal(uint32(1)))
Expect(stats.FreeConns).To(Equal(uint32(1)))
})

It("should Echo", func() {
Expand Down Expand Up @@ -187,6 +197,29 @@ var _ = Describe("Commands", func() {
Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second))
})

It("Should Command", func() {
cmds, err := client.Command().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(cmds)).To(BeNumerically("~", 180, 10))

cmd := cmds["mget"]
Expect(cmd.Name).To(Equal("mget"))
Expect(cmd.Arity).To(Equal(int8(-2)))
Expect(cmd.Flags).To(ContainElement("readonly"))
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
Expect(cmd.StepCount).To(Equal(int8(1)))

cmd = cmds["ping"]
Expect(cmd.Name).To(Equal("ping"))
Expect(cmd.Arity).To(Equal(int8(-1)))
Expect(cmd.Flags).To(ContainElement("stale"))
Expect(cmd.Flags).To(ContainElement("fast"))
Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
Expect(cmd.LastKeyPos).To(Equal(int8(0)))
Expect(cmd.StepCount).To(Equal(int8(0)))
})

})

Describe("debugging", func() {
Expand Down Expand Up @@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() {

})

Describe("Command", func() {

It("returns map of commands", func() {
cmds, err := client.Command().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(cmds)).To(BeNumerically("~", 180, 10))

cmd := cmds["mget"]
Expect(cmd.Name).To(Equal("mget"))
Expect(cmd.Arity).To(Equal(int8(-2)))
Expect(cmd.Flags).To(ContainElement("readonly"))
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
Expect(cmd.StepCount).To(Equal(int8(1)))
})

})

Describe("Eval", func() {

It("returns keys and values", func() {
Expand Down
14 changes: 12 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,26 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error)
}

func (c *ClusterClient) SlotAddrs(slot int) []string {
state, err := c.state()
if err != nil {
panic(err)
}

var addrs []string
for _, n := range c.state().slotNodes(slot) {
for _, n := range state.slotNodes(slot) {
addrs = append(addrs, n.Client.getAddr())
}
return addrs
}

// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
func (c *ClusterClient) SwapSlotNodes(slot int) {
nodes := c.state().slots[slot]
state, err := c.state()
if err != nil {
panic(err)
}

nodes := state.slots[slot]
if len(nodes) == 2 {
nodes[0], nodes[1] = nodes[1], nodes[0]
}
Expand Down
25 changes: 17 additions & 8 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,23 @@ type RedisError string
func (e RedisError) Error() string { return string(e) }

func IsRetryableError(err error) bool {
return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
if IsNetworkError(err) {
return true
}
s := err.Error()
if s == "ERR max number of clients reached" {
return true
}
if strings.HasPrefix(s, "LOADING ") {
return true
}
if strings.HasPrefix(s, "CLUSTERDOWN ") {
return true
}
return false
}

func IsInternalError(err error) bool {
func IsRedisError(err error) bool {
_, ok := err.(RedisError)
return ok
}
Expand All @@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsInternalError(err) {
if IsRedisError(err) {
return false
}
if allowTimeout {
Expand All @@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
}

func IsMovedError(err error) (moved bool, ask bool, addr string) {
if !IsInternalError(err) {
if !IsRedisError(err) {
return
}

Expand All @@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}

func IsClusterDownError(err error) bool {
return strings.HasPrefix(err.Error(), "CLUSTERDOWN ")
}
2 changes: 1 addition & 1 deletion internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) {
return nil, bufio.ErrBufferFull
}
if len(line) == 0 {
return nil, internal.RedisError("redis: reply is empty")
return nil, fmt.Errorf("redis: reply is empty")
}
if isNilReply(line) {
return nil, internal.Nil
Expand Down
2 changes: 1 addition & 1 deletion internal/proto/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func Scan(b []byte, v interface{}) error {
switch v := v.(type) {
case nil:
return internal.RedisError("redis: Scan(nil)")
return fmt.Errorf("redis: Scan(nil)")
case *string:
*v = internal.BytesToString(b)
return nil
Expand Down
3 changes: 2 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis_test
import (
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -51,7 +52,7 @@ var cluster = &clusterScenario{
}

func init() {
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}

var _ = BeforeSuite(func() {
Expand Down
Loading

0 comments on commit 74d5147

Please sign in to comment.