From a6c96df8f7afe754243e95fc90d899f6cfd4a7d4 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sun, 1 Sep 2019 23:23:49 +0800 Subject: [PATCH 1/4] Imp: split protocol/dubbo/pool.go gettyRPCClient.close and gettyRPCClientPool.remove --- protocol/dubbo/pool.go | 55 ++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index ecd57e466a..7b57a91329 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "sync" + "sync/atomic" "time" ) @@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge time.Sleep(1e6) } logger.Infof("client init ok") - c.created = time.Now().Unix() + c.updateActive(time.Now().Unix()) return c, nil } +func (c *gettyRPCClient) updateActive(active int64) { + atomic.StoreInt64(&c.created, active) +} + +func (c *gettyRPCClient) getActive() int64 { + return atomic.LoadInt64(&c.created) +} + func (c *gettyRPCClient) newSession(session getty.Session) error { var ( ok bool @@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { } logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) if len(c.sessions) == 0 { - c.pool.Lock() - c.close() // -> pool.remove(c) - c.pool.Unlock() + c.pool.safeRemove(c) + c.close() } } @@ -225,10 +233,10 @@ func (c *gettyRPCClient) isAvailable() bool { } func (c *gettyRPCClient) close() error { - err := perrors.Errorf("close gettyRPCClient{%#v} again", c) + closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - // delete @c from client pool - c.pool.remove(c) + c.lock.Lock() + defer c.lock.Unlock() c.gettyClient.Close() c.gettyClient = nil for _, s := range c.sessions { @@ -238,10 +246,10 @@ func (c *gettyRPCClient) close() error { } c.sessions = c.sessions[:0] - c.created = 0 - err = nil + c.updateActive(0) + closeErr = nil }) - return err + return closeErr } type gettyRPCClientPool struct { @@ -286,11 +294,14 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] - if d := now - conn.created; d > p.ttl { - conn.close() // -> pool.remove(c) + if d := now - conn.getActive(); d > p.ttl { + if closeErr := conn.close(); closeErr != nil { + p.remove(conn) + } continue } - conn.created = now //update created time + conn.updateActive(now) //update created time + return conn, nil } // create new conn @@ -298,7 +309,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC } func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } if err != nil { @@ -313,19 +324,20 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { } if len(p.conns) >= p.size { - conn.close() + if closeErr := conn.close(); closeErr != nil { + // delete @conn from client pool + p.remove(conn) + } return } p.conns = append(p.conns, conn) } func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } - //p.Lock() - //defer p.Unlock() if p.conns == nil { return } @@ -339,3 +351,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { } } } + +func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) { + p.Lock() + defer p.Unlock() + + p.remove(conn) +} From ce6e685b84324f6d7e377275182dd65edf65d59a Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sun, 1 Sep 2019 23:51:21 +0800 Subject: [PATCH 2/4] Imp: adjust pool close lock --- protocol/dubbo/pool.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 7b57a91329..62d3c4b0c4 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -235,8 +235,6 @@ func (c *gettyRPCClient) isAvailable() bool { func (c *gettyRPCClient) close() error { closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - c.lock.Lock() - defer c.lock.Unlock() c.gettyClient.Close() c.gettyClient = nil for _, s := range c.sessions { @@ -272,9 +270,9 @@ func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) * func (p *gettyRPCClientPool) close() { p.Lock() + defer p.Unlock() conns := p.conns p.conns = nil - p.Unlock() for _, conn := range conns { conn.close() } @@ -312,13 +310,15 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { if conn == nil || conn.getActive() == 0 { return } + + p.Lock() + defer p.Unlock() + if err != nil { conn.close() return } - p.Lock() - defer p.Unlock() if p.conns == nil { return } From ff4f9b9af0bf24a09113a4d1dae062e1440b01d0 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sun, 1 Sep 2019 23:56:26 +0800 Subject: [PATCH 3/4] Add: gettyRPCClient.safeClose --- protocol/dubbo/pool.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 62d3c4b0c4..bff231e581 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -250,6 +250,13 @@ func (c *gettyRPCClient) close() error { return closeErr } +func (c *gettyRPCClient) safeClose() error { + c.lock.Lock() + defer c.lock.Unlock() + + return c.close() +} + type gettyRPCClientPool struct { rpcClient *Client size int // size of []*gettyRPCClient @@ -270,11 +277,11 @@ func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) * func (p *gettyRPCClientPool) close() { p.Lock() - defer p.Unlock() conns := p.conns p.conns = nil + p.Unlock() for _, conn := range conns { - conn.close() + conn.safeClose() } } @@ -293,7 +300,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC p.conns = p.conns[:len(p.conns)-1] if d := now - conn.getActive(); d > p.ttl { - if closeErr := conn.close(); closeErr != nil { + if closeErr := conn.safeClose(); closeErr != nil { p.remove(conn) } continue @@ -311,20 +318,20 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { return } - p.Lock() - defer p.Unlock() - if err != nil { - conn.close() + conn.safeClose() return } + p.Lock() + defer p.Unlock() + if p.conns == nil { return } if len(p.conns) >= p.size { - if closeErr := conn.close(); closeErr != nil { + if closeErr := conn.safeClose(); closeErr != nil { // delete @conn from client pool p.remove(conn) } From aa5c16008e82a2058303f8207fcd6acf7c6e4216 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Mon, 2 Sep 2019 00:04:59 +0800 Subject: [PATCH 4/4] Fix: closeErr == nil --- protocol/dubbo/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index bff231e581..cdd2a10a47 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -300,7 +300,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC p.conns = p.conns[:len(p.conns)-1] if d := now - conn.getActive(); d > p.ttl { - if closeErr := conn.safeClose(); closeErr != nil { + if closeErr := conn.safeClose(); closeErr == nil { p.remove(conn) } continue @@ -331,7 +331,7 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { } if len(p.conns) >= p.size { - if closeErr := conn.safeClose(); closeErr != nil { + if closeErr := conn.safeClose(); closeErr == nil { // delete @conn from client pool p.remove(conn) }