Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions cluster_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,42 @@ func (c *ClusterClient) Pipeline() *ClusterPipeline {
return pipe
}

func (c *ClusterPipeline) process(cmd Cmder) {
c.cmds = append(c.cmds, cmd)
func (pipe *ClusterPipeline) process(cmd Cmder) {
pipe.cmds = append(pipe.cmds, cmd)
}

// Close marks the pipeline as closed
func (c *ClusterPipeline) Close() error {
c.closed = true
return nil
}

// Discard resets the pipeline and discards queued commands
func (c *ClusterPipeline) Discard() error {
if c.closed {
// Discard resets the pipeline and discards queued commands.
func (pipe *ClusterPipeline) Discard() error {
if pipe.closed {
return errClosed
}
c.cmds = c.cmds[:0]
pipe.cmds = pipe.cmds[:0]
return nil
}

func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
if c.closed {
func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
if pipe.closed {
return nil, errClosed
}
if len(c.cmds) == 0 {
if len(pipe.cmds) == 0 {
return []Cmder{}, nil
}

cmds = c.cmds
c.cmds = make([]Cmder, 0, 10)
cmds = pipe.cmds
pipe.cmds = make([]Cmder, 0, 10)

cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
slot := hashSlot(cmd.clusterKey())
addr := c.cluster.slotMasterAddr(slot)
addr := pipe.cluster.slotMasterAddr(slot)
cmdsMap[addr] = append(cmdsMap[addr], cmd)
}

for attempt := 0; attempt <= c.cluster.opt.getMaxRedirects(); attempt++ {
for attempt := 0; attempt <= pipe.cluster.opt.getMaxRedirects(); attempt++ {
failedCmds := make(map[string][]Cmder)

for addr, cmds := range cmdsMap {
client, err := c.cluster.getClient(addr)
client, err := pipe.cluster.getClient(addr)
if err != nil {
setCmdsErr(cmds, err)
retErr = err
Expand All @@ -75,7 +69,7 @@ func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
continue
}

failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds)
failedCmds, err = pipe.execClusterCmds(cn, cmds, failedCmds)
if err != nil {
retErr = err
}
Expand All @@ -88,7 +82,14 @@ func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
return cmds, retErr
}

func (c *ClusterPipeline) execClusterCmds(
// Close marks the pipeline as closed
func (pipe *ClusterPipeline) Close() error {
pipe.Discard()
pipe.closed = true
return nil
}

func (pipe *ClusterPipeline) execClusterCmds(
cn *conn, cmds []Cmder, failedCmds map[string][]Cmder,
) (map[string][]Cmder, error) {
if err := cn.writeCmds(cmds...); err != nil {
Expand All @@ -107,7 +108,7 @@ func (c *ClusterPipeline) execClusterCmds(
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
break
} else if moved, ask, addr := isMovedError(err); moved {
c.cluster.lazyReloadSlots()
pipe.cluster.lazyReloadSlots()
cmd.reset()
failedCmds[addr] = append(failedCmds[addr], cmd)
} else if ask {
Expand Down
95 changes: 53 additions & 42 deletions pipeline.go
Original file line number Diff line number Diff line change
@@ -1,102 +1,113 @@
package redis

// Not thread-safe.
// Pipeline implements pipelining as described in
// http://redis.io/topics/pipelining.
//
// Pipeline is not thread-safe.
type Pipeline struct {
commandable

cmds []Cmder
client *baseClient

cmds []Cmder
closed bool
}

func (c *Client) Pipeline() *Pipeline {
pipe := &Pipeline{
client: &baseClient{
opt: c.opt,
connPool: c.connPool,
},
cmds: make([]Cmder, 0, 10),
client: c.baseClient,
cmds: make([]Cmder, 0, 10),
}
pipe.commandable.process = pipe.process
return pipe
}

func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
pc := c.Pipeline()
if err := f(pc); err != nil {
func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
pipe := c.Pipeline()
if err := fn(pipe); err != nil {
return nil, err
}
cmds, err := pc.Exec()
pc.Close()
cmds, err := pipe.Exec()
pipe.Close()
return cmds, err
}

func (c *Pipeline) process(cmd Cmder) {
c.cmds = append(c.cmds, cmd)
func (pipe *Pipeline) process(cmd Cmder) {
pipe.cmds = append(pipe.cmds, cmd)
}

func (c *Pipeline) Close() error {
c.closed = true
func (pipe *Pipeline) Close() error {
pipe.Discard()
pipe.closed = true
return nil
}

func (c *Pipeline) Discard() error {
if c.closed {
// Discard resets the pipeline and discards queued commands.
func (pipe *Pipeline) Discard() error {
if pipe.closed {
return errClosed
}
c.cmds = c.cmds[:0]
pipe.cmds = pipe.cmds[:0]
return nil
}

// Exec always returns list of commands and error of the first failed
// command if any.
func (c *Pipeline) Exec() (cmds []Cmder, retErr error) {
if c.closed {
func (pipe *Pipeline) Exec() (cmds []Cmder, retErr error) {
if pipe.closed {
return nil, errClosed
}
if len(c.cmds) == 0 {
return c.cmds, nil
if len(pipe.cmds) == 0 {
return pipe.cmds, nil
}

cmds = c.cmds
c.cmds = make([]Cmder, 0, 0)

for i := 0; i <= c.client.opt.MaxRetries; i++ {
if i > 0 {
resetCmds(cmds)
}
cmds = pipe.cmds
pipe.cmds = make([]Cmder, 0, 10)

cn, err := c.client.conn()
failedCmds := cmds
for i := 0; i <= pipe.client.opt.MaxRetries; i++ {
cn, err := pipe.client.conn()
if err != nil {
setCmdsErr(cmds, err)
setCmdsErr(failedCmds, err)
return cmds, err
}

retErr = c.execCmds(cn, cmds)
c.client.putConn(cn, err)
if shouldRetry(err) {
continue
if i > 0 {
resetCmds(failedCmds)
}
failedCmds, err = execCmds(cn, failedCmds)
pipe.client.putConn(cn, err)
if err != nil && retErr == nil {
retErr = err
}
if len(failedCmds) == 0 {
break
}

break
}

return cmds, retErr
}

func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
func execCmds(cn *conn, cmds []Cmder) ([]Cmder, error) {
if err := cn.writeCmds(cmds...); err != nil {
setCmdsErr(cmds, err)
return err
return cmds, err
}

var firstCmdErr error
var failedCmds []Cmder
for _, cmd := range cmds {
err := cmd.parseReply(cn.rd)
if err != nil && firstCmdErr == nil {
if err == nil {
continue
}
if firstCmdErr == nil {
firstCmdErr = err
}
if shouldRetry(err) {
failedCmds = append(failedCmds, cmd)
}
}

return firstCmdErr
return failedCmds, firstCmdErr
}
121 changes: 119 additions & 2 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type RingOptions struct {
DB int64
Password string

MaxRetries int

DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Expand Down Expand Up @@ -105,6 +107,7 @@ func (shard *ringShard) Vote(up bool) bool {
type Ring struct {
commandable

opt *RingOptions
nreplicas int

mx sync.RWMutex
Expand All @@ -117,9 +120,11 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
ring := &Ring{
opt: opt,
nreplicas: nreplicas,
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),

hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
ring.commandable.process = ring.process
for name, addr := range opt.Addrs {
Expand Down Expand Up @@ -235,3 +240,115 @@ func (ring *Ring) Close() (retErr error) {

return retErr
}

// RingPipeline creates a new pipeline which is able to execute commands
// against multiple shards.
type RingPipeline struct {
commandable

ring *Ring

cmds []Cmder
closed bool
}

func (ring *Ring) Pipeline() *RingPipeline {
pipe := &RingPipeline{
ring: ring,
cmds: make([]Cmder, 0, 10),
}
pipe.commandable.process = pipe.process
return pipe
}

func (ring *Ring) Pipelined(fn func(*RingPipeline) error) ([]Cmder, error) {
pipe := ring.Pipeline()
if err := fn(pipe); err != nil {
return nil, err
}
cmds, err := pipe.Exec()
pipe.Close()
return cmds, err
}

func (pipe *RingPipeline) process(cmd Cmder) {
pipe.cmds = append(pipe.cmds, cmd)
}

// Discard resets the pipeline and discards queued commands.
func (pipe *RingPipeline) Discard() error {
if pipe.closed {
return errClosed
}
pipe.cmds = pipe.cmds[:0]
return nil
}

// Exec always returns list of commands and error of the first failed
// command if any.
func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
if pipe.closed {
return nil, errClosed
}
if len(pipe.cmds) == 0 {
return pipe.cmds, nil
}

cmds = pipe.cmds
pipe.cmds = make([]Cmder, 0, 10)

cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
name := pipe.ring.hash.Get(cmd.clusterKey())
cmdsMap[name] = append(cmdsMap[name], cmd)
}

for i := 0; i <= pipe.ring.opt.MaxRetries; i++ {
failedCmdsMap := make(map[string][]Cmder)

for name, cmds := range cmdsMap {
client, err := pipe.ring.getClient(name)
if err != nil {
setCmdsErr(cmds, err)
if retErr == nil {
retErr = err
}
continue
}

cn, err := client.conn()
if err != nil {
setCmdsErr(cmds, err)
if retErr == nil {
retErr = err
}
continue
}

if i > 0 {
resetCmds(cmds)
}
failedCmds, err := execCmds(cn, cmds)
client.putConn(cn, err)
if err != nil && retErr == nil {
retErr = err
}
if len(failedCmds) > 0 {
failedCmdsMap[name] = failedCmds
}
}

if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}

return cmds, retErr
}

func (pipe *RingPipeline) Close() error {
pipe.Discard()
pipe.closed = true
return nil
}
Loading