Skip to content

Implement Close and fix reaper goroutine leak. #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ type ClusterClient struct {

addrs []string
slots [][]string
slotsMx sync.RWMutex // protects slots & addrs cache
slotsMx sync.RWMutex // Protects slots and addrs.

clients map[string]*Client
clientsMx sync.RWMutex
closed bool
clientsMx sync.RWMutex // Protects clients and closed.

opt *ClusterOptions

Expand All @@ -35,20 +36,27 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
}
client.commandable.process = client.process
client.reloadIfDue()
go client.reaper(time.NewTicker(5 * time.Minute))
go client.reaper()
return client
}

// Close closes the cluster client.
// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error {
// TODO: close should make client unusable
c.setSlots(nil)
defer c.clientsMx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem, just out of interest: what's the benefit of deferring the unlock before acquiring the lock? I've seen you doing it on a few occasions. Looks like no one else is doing it this way 'round.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be slightly faster (I believe), because defer c.clientsMx.Unlock() is evaluated before the lock is acquired. It does not matter too much and I have to stop doing this.

c.clientsMx.Lock()

if c.closed {
return nil
}
c.closed = true
c.resetClients()
c.setSlots(nil)
return nil
}

// ------------------------------------------------------------------------

// getClient returns a Client for a given address.
func (c *ClusterClient) getClient(addr string) (*Client, error) {
if addr == "" {
Expand All @@ -64,6 +72,11 @@ func (c *ClusterClient) getClient(addr string) (*Client, error) {
c.clientsMx.RUnlock()

c.clientsMx.Lock()
if c.closed {
c.clientsMx.Unlock()
return nil, errClosed
}

client, ok = c.clients[addr]
if !ok {
opt := c.opt.clientOptions()
Expand All @@ -83,7 +96,7 @@ func (c *ClusterClient) slotAddrs(slot int) []string {
return addrs
}

// randomClient returns a Client for the first live node.
// randomClient returns a Client for the first pingable node.
func (c *ClusterClient) randomClient() (client *Client, err error) {
for i := 0; i < 10; i++ {
n := rand.Intn(len(c.addrs))
Expand Down Expand Up @@ -165,14 +178,12 @@ func (c *ClusterClient) process(cmd Cmder) {

// Closes all clients and returns last error if there are any.
func (c *ClusterClient) resetClients() (err error) {
c.clientsMx.Lock()
for addr, client := range c.clients {
if e := client.Close(); e != nil {
err = e
}
delete(c.clients, addr)
}
c.clientsMx.Unlock()
return err
}

Expand Down Expand Up @@ -229,16 +240,28 @@ func (c *ClusterClient) scheduleReload() {
}

// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(ticker *time.Ticker) {
func (c *ClusterClient) reaper() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for _ = range ticker.C {
c.clientsMx.RLock()

if c.closed {
c.clientsMx.RUnlock()
break
}

for _, client := range c.clients {
pool := client.connPool
// pool.First removes idle connections from the pool for us. So
// just put returned connection back.
// pool.First removes idle connections from the pool and
// returns first non-idle connection. So just put returned
// connection back.
if cn := pool.First(); cn != nil {
pool.Put(cn)
}
}

c.clientsMx.RUnlock()
}
}

Expand Down
1 change: 1 addition & 0 deletions cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var _ = Describe("ClusterClient", func() {
Expect(subject.slots[8191]).To(BeEmpty())
Expect(subject.slots[8192]).To(BeEmpty())
Expect(subject.slots[16383]).To(BeEmpty())
Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed"))
})

It("should check if reload is due", func() {
Expand Down