From b41ef931ed4678312eb38d465b729a0b1afdebed Mon Sep 17 00:00:00 2001 From: Abhinav Dangeti Date: Tue, 15 Jun 2021 10:58:27 -0600 Subject: [PATCH] MB-43553: Conditionally closing Bucket's Pool instance + Close the Pool instance that the Bucket holds when the Bucket's connPools is cleared, ONLY when the Bucket instance is set up using either of these APIs .. - ConnectWithAuthAndGetBucket(..) - GetBucket(..) + This is to assist with closing the pool instance setup while initializing the bucket instance via the 2 APIs above. + To better facilitate the use case here - introducing a new internal API for client to setup a pool only for the requested bucket and not all the buckets in the system - which further reduces the amount of garbage. + Helps address the memory leak identified in the ticket. + `go fmt ./...` fixes formatting in an unrelated file. Change-Id: I6af32ff1e26dedcdaa62ffdff00c7e8c62bacc6f Reviewed-on: http://review.couchbase.org/c/go-couchbase/+/155752 Tested-by: Abhinav Dangeti Reviewed-by: Sreekanth Sivasankaran --- client.go | 8 +++--- pools.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index f9d558c..a6a84b3 100644 --- a/client.go +++ b/client.go @@ -656,10 +656,10 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time, b.Refresh() discard = b.checkVBmap(pool.Node()) return nil // retry - } else if (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) { - b.Refresh() - discard = b.checkVBmap(pool.Node()) - return nil // retry + } else if st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries { + b.Refresh() + discard = b.checkVBmap(pool.Node()) + return nil // retry } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED { if (attempts % (MaxBulkRetries / 100)) == 0 { logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:%v)", diff --git a/pools.go b/pools.go index f7e98b0..fdf7e00 100644 --- a/pools.go +++ b/pools.go @@ -256,6 +256,8 @@ type Bucket struct { ah AuthHandler // auth handler ds *DurablitySettings // Durablity Settings for this bucket closed bool + + dedicatedPool bool // Set if the pool instance above caters to this Bucket alone } // PoolServices is all the bucket-independent services in a pool @@ -1528,6 +1530,49 @@ func (c *Client) GetPool(name string) (p Pool, err error) { return } +func (c *Client) setupPoolForBucket(poolname, bucketname string) (p Pool, err error) { + var poolURI string + + for _, p := range c.Info.Pools { + if p.Name == poolname { + poolURI = p.URI + break + } + } + if poolURI == "" { + return p, errors.New("No pool named " + poolname) + } + + err = c.parseURLResponse(poolURI, &p) + if err != nil { + return p, err + } + + p.client = c + p.BucketMap = make(map[string]*Bucket) + + buckets := []Bucket{} + err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets) + if err != nil { + return + } + for i, _ := range buckets { + if buckets[i].Name == bucketname { + b := new(Bucket) + *b = buckets[i] + b.pool = &p + b.nodeList = unsafe.Pointer(&b.NodesJSON) + b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList))) + p.BucketMap[b.Name] = b + runtime.SetFinalizer(b, bucketFinalizer) + break + } + } + buckets = nil + + return +} + // GetPoolServices returns all the bucket-independent services in a pool. // (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV) func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) { @@ -1572,6 +1617,12 @@ func (b *Bucket) Close() { } } b.connPools = nil + + // Close the associated pool asynchronously which acquires the + // bucket lock separately. + if b.dedicatedPool { + go b.pool.Close() + } } } @@ -1667,12 +1718,20 @@ func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) { return nil, err } - pool, err := client.GetPool(poolname) + pool, err := client.setupPoolForBucket(poolname, bucketname) + if err != nil { + return nil, err + } + + bucket, err := pool.GetBucket(bucketname) if err != nil { + // close dedicated pool on error + pool.Close() return nil, err } - return pool.GetBucket(bucketname) + bucket.dedicatedPool = true + return bucket, nil } // ConnectWithAuthAndGetBucket is a convenience function for @@ -1684,12 +1743,20 @@ func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string, return nil, err } - pool, err := client.GetPool(poolname) + pool, err := client.setupPoolForBucket(poolname, bucketname) + if err != nil { + return nil, err + } + + bucket, err := pool.GetBucket(bucketname) if err != nil { + // close dedicated pool on error + pool.Close() return nil, err } - return pool.GetBucket(bucketname) + bucket.dedicatedPool = true + return bucket, nil } func GetSystemBucket(c *Client, p *Pool, name string) (*Bucket, error) {