Skip to content

Commit

Permalink
MB-43553: Conditionally closing Bucket's Pool instance
Browse files Browse the repository at this point in the history
+ Close the Pool instance that the Bucket holds when
  the Bucket's connPools is cleared, ONLY when the Bucket
  instance is set up using either of these APIs ..
    - ConnectWithAuthAndGetBucket(..)
    - GetBucket(..)
+ This is to assist with closing the pool instance
  setup while initializing the bucket instance via the
  2 APIs above.

+ To better facilitate the use case here - introducing a
  new internal API for client to setup a pool only for
  the requested bucket and not all the buckets in the
  system - which further reduces the amount of garbage.

+ Helps address the memory leak identified in the ticket.
+ `go fmt ./...` fixes formatting in an unrelated file.

Change-Id: I6af32ff1e26dedcdaa62ffdff00c7e8c62bacc6f
Reviewed-on: http://review.couchbase.org/c/go-couchbase/+/155752
Tested-by: Abhinav Dangeti <abhinav@couchbase.com>
Reviewed-by: Sreekanth Sivasankaran <sreekanth.sivasankaran@couchbase.com>
  • Loading branch information
abhinavdangeti committed Jun 17, 2021
1 parent bba8776 commit b41ef93
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 8 deletions.
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,10 +656,10 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time,
b.Refresh()
discard = b.checkVBmap(pool.Node())
return nil // retry
} else if (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) {
b.Refresh()
discard = b.checkVBmap(pool.Node())
return nil // retry
} else if st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries {
b.Refresh()
discard = b.checkVBmap(pool.Node())
return nil // retry
} else if st == gomemcached.EBUSY || st == gomemcached.LOCKED {
if (attempts % (MaxBulkRetries / 100)) == 0 {
logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
Expand Down
75 changes: 71 additions & 4 deletions pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ type Bucket struct {
ah AuthHandler // auth handler
ds *DurablitySettings // Durablity Settings for this bucket
closed bool

dedicatedPool bool // Set if the pool instance above caters to this Bucket alone
}

// PoolServices is all the bucket-independent services in a pool
Expand Down Expand Up @@ -1528,6 +1530,49 @@ func (c *Client) GetPool(name string) (p Pool, err error) {
return
}

func (c *Client) setupPoolForBucket(poolname, bucketname string) (p Pool, err error) {
var poolURI string

for _, p := range c.Info.Pools {
if p.Name == poolname {
poolURI = p.URI
break
}
}
if poolURI == "" {
return p, errors.New("No pool named " + poolname)
}

err = c.parseURLResponse(poolURI, &p)
if err != nil {
return p, err
}

p.client = c
p.BucketMap = make(map[string]*Bucket)

buckets := []Bucket{}
err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
if err != nil {
return
}
for i, _ := range buckets {
if buckets[i].Name == bucketname {
b := new(Bucket)
*b = buckets[i]
b.pool = &p
b.nodeList = unsafe.Pointer(&b.NodesJSON)
b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
p.BucketMap[b.Name] = b
runtime.SetFinalizer(b, bucketFinalizer)
break
}
}
buckets = nil

return
}

// GetPoolServices returns all the bucket-independent services in a pool.
// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
Expand Down Expand Up @@ -1572,6 +1617,12 @@ func (b *Bucket) Close() {
}
}
b.connPools = nil

// Close the associated pool asynchronously which acquires the
// bucket lock separately.
if b.dedicatedPool {
go b.pool.Close()
}
}
}

Expand Down Expand Up @@ -1667,12 +1718,20 @@ func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
return nil, err
}

pool, err := client.GetPool(poolname)
pool, err := client.setupPoolForBucket(poolname, bucketname)
if err != nil {
return nil, err
}

bucket, err := pool.GetBucket(bucketname)
if err != nil {
// close dedicated pool on error
pool.Close()
return nil, err
}

return pool.GetBucket(bucketname)
bucket.dedicatedPool = true
return bucket, nil
}

// ConnectWithAuthAndGetBucket is a convenience function for
Expand All @@ -1684,12 +1743,20 @@ func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
return nil, err
}

pool, err := client.GetPool(poolname)
pool, err := client.setupPoolForBucket(poolname, bucketname)
if err != nil {
return nil, err
}

bucket, err := pool.GetBucket(bucketname)
if err != nil {
// close dedicated pool on error
pool.Close()
return nil, err
}

return pool.GetBucket(bucketname)
bucket.dedicatedPool = true
return bucket, nil
}

func GetSystemBucket(c *Client, p *Pool, name string) (*Bucket, error) {
Expand Down

0 comments on commit b41ef93

Please sign in to comment.