Skip to content

Commit

Permalink
Merge pull request #186 from divebomb/develop
Browse files Browse the repository at this point in the history
Imp: split protocol/dubbo/pool.go gettyRPCClient.close and gettyRPCClientPool.remove
  • Loading branch information
hxmhlt authored Sep 2, 2019
2 parents a8cb98e + aa5c160 commit 141f1db
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions protocol/dubbo/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge
time.Sleep(1e6)
}
logger.Infof("client init ok")
c.created = time.Now().Unix()
c.updateActive(time.Now().Unix())

return c, nil
}

func (c *gettyRPCClient) updateActive(active int64) {
atomic.StoreInt64(&c.created, active)
}

func (c *gettyRPCClient) getActive() int64 {
return atomic.LoadInt64(&c.created)
}

func (c *gettyRPCClient) newSession(session getty.Session) error {
var (
ok bool
Expand Down Expand Up @@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
}
logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.pool.Lock()
c.close() // -> pool.remove(c)
c.pool.Unlock()
c.pool.safeRemove(c)
c.close()
}
}

Expand Down Expand Up @@ -225,10 +233,8 @@ func (c *gettyRPCClient) isAvailable() bool {
}

func (c *gettyRPCClient) close() error {
err := perrors.Errorf("close gettyRPCClient{%#v} again", c)
closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
c.gettyClient.Close()
c.gettyClient = nil
for _, s := range c.sessions {
Expand All @@ -238,10 +244,17 @@ func (c *gettyRPCClient) close() error {
}
c.sessions = c.sessions[:0]

c.created = 0
err = nil
c.updateActive(0)
closeErr = nil
})
return err
return closeErr
}

func (c *gettyRPCClient) safeClose() error {
c.lock.Lock()
defer c.lock.Unlock()

return c.close()
}

type gettyRPCClientPool struct {
Expand All @@ -268,7 +281,7 @@ func (p *gettyRPCClientPool) close() {
p.conns = nil
p.Unlock()
for _, conn := range conns {
conn.close()
conn.safeClose()
}
}

Expand All @@ -286,46 +299,52 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC
conn := p.conns[len(p.conns)-1]
p.conns = p.conns[:len(p.conns)-1]

if d := now - conn.created; d > p.ttl {
conn.close() // -> pool.remove(c)
if d := now - conn.getActive(); d > p.ttl {
if closeErr := conn.safeClose(); closeErr == nil {
p.remove(conn)
}
continue
}
conn.created = now //update created time
conn.updateActive(now) //update created time

return conn, nil
}
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}

func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.created == 0 {
if conn == nil || conn.getActive() == 0 {
return
}

if err != nil {
conn.close()
conn.safeClose()
return
}

p.Lock()
defer p.Unlock()

if p.conns == nil {
return
}

if len(p.conns) >= p.size {
conn.close()
if closeErr := conn.safeClose(); closeErr == nil {
// delete @conn from client pool
p.remove(conn)
}
return
}
p.conns = append(p.conns, conn)
}

func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.created == 0 {
if conn == nil || conn.getActive() == 0 {
return
}

//p.Lock()
//defer p.Unlock()
if p.conns == nil {
return
}
Expand All @@ -339,3 +358,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
}
}
}

func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
p.Lock()
defer p.Unlock()

p.remove(conn)
}

0 comments on commit 141f1db

Please sign in to comment.