Skip to content

Commit

Permalink
Improve ReceiveMessage.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Dec 2, 2015
1 parent 5efe0cc commit 42141f1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
3 changes: 2 additions & 1 deletion multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (c *Client) Multi() *Multi {
func (c *Multi) putConn(cn *conn, ei error) {
var err error
if isBadConn(cn, ei) {
err = c.base.connPool.Remove(nil) // nil to force removal
// Close current connection.
c.base.connPool.(*stickyConnPool).Reset()
} else {
err = c.base.connPool.Put(cn)
}
Expand Down
17 changes: 11 additions & 6 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func newConnPool(opt *Options) *connPool {
p := &connPool{
dialer: newConnDialer(opt),

rl: ratelimit.New(2*opt.getPoolSize(), time.Second),
rl: ratelimit.New(3*opt.getPoolSize(), time.Second),
opt: opt,
conns: newConnList(opt.getPoolSize()),
freeConns: make(chan *conn, opt.getPoolSize()),
Expand Down Expand Up @@ -458,11 +458,7 @@ func (p *stickyConnPool) Remove(cn *conn) error {
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
if cn == nil {
return p.remove()
} else {
return nil
}
return nil
}

func (p *stickyConnPool) Len() int {
Expand All @@ -483,6 +479,15 @@ func (p *stickyConnPool) FreeLen() int {
return 0
}

func (p *stickyConnPool) Reset() (err error) {
p.mx.Lock()
if p.cn != nil {
err = p.remove()
}
p.mx.Unlock()
return err
}

func (p *stickyConnPool) Close() error {
defer p.mx.Unlock()
p.mx.Lock()
Expand Down
42 changes: 22 additions & 20 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (c *PubSub) Receive() (interface{}, error) {

func (c *PubSub) reconnect() {
// Close current connection.
c.connPool.Remove(nil) // nil to force removal
c.connPool.(*stickyConnPool).Reset()

if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil {
Expand All @@ -252,39 +252,41 @@ func (c *PubSub) reconnect() {
// ReceiveMessage returns a message or error. It automatically
// reconnects to Redis in case of network errors.
func (c *PubSub) ReceiveMessage() (*Message, error) {
var badConn bool
var errNum int
for {
msgi, err := c.ReceiveTimeout(5 * time.Second)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
if badConn {
c.reconnect()
badConn = false
continue
}
if !isNetworkError(err) {
return nil, err
}

goodConn := errNum == 0
errNum++

err := c.Ping("")
if err != nil {
c.reconnect()
} else {
badConn = true
if goodConn {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err := c.Ping("")
if err == nil {
continue
}
log.Printf("redis: PubSub.Ping failed: %s", err)
}
continue
}

if isNetworkError(err) {
c.reconnect()
continue
if errNum > 2 {
time.Sleep(time.Second)
}

return nil, err
c.reconnect()
continue
}

// Reset error number.
errNum = 0

switch msg := msgi.(type) {
case *Subscription:
// Ignore.
case *Pong:
badConn = false
// Ignore.
case *Message:
return msg, nil
Expand Down
13 changes: 10 additions & 3 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()

cn, _, err := pubsub.Pool().Get()
cn1, _, err := pubsub.Pool().Get()
Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{
cn1.SetNetConn(&badConn{
readErr: errTimeout,
writeErr: errTimeout,
})
Expand All @@ -286,7 +286,7 @@ var _ = Describe("PubSub", func() {
wg.Wait()
})

It("should not panic on Close", func() {
It("should return on Close", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
Expand All @@ -297,13 +297,20 @@ var _ = Describe("PubSub", func() {
defer GinkgoRecover()

wg.Done()

_, err := pubsub.ReceiveMessage()
Expect(err).To(MatchError("redis: client is closed"))

wg.Done()
}()

wg.Wait()
wg.Add(1)

err = pubsub.Close()
Expect(err).NotTo(HaveOccurred())

wg.Wait()
})

})

0 comments on commit 42141f1

Please sign in to comment.