Skip to content

Commit

Permalink
Don't remember cluster node for the PubSub
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Feb 8, 2019
1 parent 40336e5 commit 981cf0f
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,40 +1522,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}

func (c *ClusterClient) pubSub(channels []string) *PubSub {
func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),

newConn: func(channels []string) (*pool.Conn, error) {
if node == nil {
var slot int
if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
if node != nil {
panic("node != nil")
}

masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}
node = masterNode
slot := hashtag.Slot(channels[0])

var err error
node, err = c.slotMasterNode(slot)
if err != nil {
return nil, err
}
return node.Client.newConn()

cn, err := node.Client.newConn()
if err != nil {
return nil, err
}

return cn, nil
},
closeConn: func(cn *pool.Conn) error {
return node.Client.connPool.CloseConn(cn)
err := node.Client.connPool.CloseConn(cn)
node = nil
return err
},
}
pubsub.init()

return pubsub
}

// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
Expand All @@ -1565,7 +1571,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
Expand Down

0 comments on commit 981cf0f

Please sign in to comment.