Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: try to fix too many files open error #797

Merged
merged 7 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add poolqueue to remove lock
  • Loading branch information
wenxuwan committed Oct 17, 2020
commit 36f92db1b73dcb569ac0ae790ae534f11a2cdb00
12 changes: 9 additions & 3 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,25 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
return errSessionNotExist
}
defer func() {
c.pool.ch <- struct{}{}
failNumber := 0
if err == nil {
for {
ok := atomic.CompareAndSwapUint32(&c.pool.pushing, 0, 1)
if ok {
c.pool.poolQueue.pushHead(conn)
c.pool.pushing = 0
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
c.pool.ch <- struct{}{}
return
}
time.Sleep(1e6)
failNumber++
if failNumber%10 == 0 {
time.Sleep(1e6)
}
}
} else {
c.pool.ch <- struct{}{}
conn.close()
}
conn.close()
}()

if err = c.transfer(session, p, rsp); err != nil {
Expand Down
113 changes: 24 additions & 89 deletions protocol/dubbo/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
}
}()
if removeFlag {
c.pool.safeRemove(c)
c.close()
}
}
Expand Down Expand Up @@ -289,16 +288,15 @@ func (c *gettyRPCClient) close() error {

type gettyRPCClientPool struct {
rpcClient *Client
maxSize int // maxSize of []*gettyRPCClient
maxSize int // maxSize of poolQueue
ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
activeNumber uint32
chInitialized uint32 // set to 1 when field ch is initialized
ch chan struct{}
closeCh chan struct{}
poolQueue *poolDequeue
poolQueue *poolDequeue // store *gettyRPCClient
pushing uint32
sync.RWMutex
conns []*gettyRPCClient
}

func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
Expand All @@ -309,19 +307,24 @@ func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *
rpcClient: rpcClient,
maxSize: size,
ttl: int64(ttl.Seconds()),
conns: make([]*gettyRPCClient, 0, 16),
closeCh: make(chan struct{}, 0),
poolQueue: pq,
}
}

func (p *gettyRPCClientPool) close() {
p.Lock()
conns := p.conns
p.conns = nil
connPool := p.poolQueue
p.poolQueue = nil
p.Unlock()
for _, conn := range conns {
conn.close()
for {
conn, ok := connPool.popTail()
if ok {
c := conn.(*gettyRPCClient)
c.close()
} else {
break
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down Expand Up @@ -378,87 +381,19 @@ func (p *gettyRPCClientPool) getConnFromPoll() (*gettyRPCClient, error) {
if p.poolQueue == nil {
return nil, errClientPoolClosed
}
for value, ok := p.poolQueue.popTail(); ok; {
conn := value.(*gettyRPCClient)
if d := now - conn.getActive(); d > p.ttl {
go conn.close()
continue
}
conn.updateActive(now)
return conn, nil
}
return nil, nil
}

func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
now := time.Now().Unix()
if p.conns == nil {
return nil, errClientPoolClosed
}
for len(p.conns) > 0 {
conn := p.conns[0]
p.conns = p.conns[1:]
if d := now - conn.getActive(); d > p.ttl {
p.remove(conn)
go conn.close()
continue
}
conn.updateActive(now) //update active time
return conn, nil
}
return nil, nil
}

func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
if conn == nil || conn.getActive() == 0 {
return
}

p.Lock()
defer p.Unlock()

if p.conns == nil {
return
}

// check whether @conn has existed in p.conns or not.
for i := range p.conns {
if p.conns[i] == conn {
return
}
}

if len(p.conns) >= p.maxSize {
// delete @conn from client pool
// p.remove(conn)
conn.close()
return
}
p.conns = append(p.conns, conn)
}

func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.getActive() == 0 {
return
}

if p.conns == nil {
return
}

if len(p.conns) > 0 {
for idx, c := range p.conns {
if conn == c {
p.conns = append(p.conns[:idx], p.conns[idx+1:]...)
break
for {
value, ok := p.poolQueue.popTail()
zouyx marked this conversation as resolved.
Show resolved Hide resolved
if ok {
conn := value.(*gettyRPCClient)
if d := now - conn.getActive(); d > p.ttl {
go conn.close()
continue
}
conn.updateActive(now)
return conn, nil
} else {
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
break
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
p.Lock()
defer p.Unlock()

p.remove(conn)
return nil, nil
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
}