Skip to content

Commit

Permalink
add max life time in pool
Browse files Browse the repository at this point in the history
  • Loading branch information
徐焱 authored and Brian Picciano committed Aug 24, 2021
1 parent 20a0a86 commit b36b5bc
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ type ioErrConn struct {
// level error, e.g. a timeout, disconnect, etc... Close is automatically
// called on the client when it encounters a critical network error
lastIOErr error

// conn create time
createdAt time.Time
}

func newIOErrConn(c Conn) *ioErrConn {
return &ioErrConn{Conn: c}
return &ioErrConn{Conn: c, createdAt: time.Now()}
}

func (ioc *ioErrConn) Encode(m resp.Marshaler) error {
Expand Down Expand Up @@ -67,6 +70,13 @@ func (ioc *ioErrConn) Close() error {
return ioc.Conn.Close()
}

func (ioc *ioErrConn) expired(timeout time.Duration) bool {
if timeout <= 0 {
return false
}
return ioc.createdAt.Add(timeout).Before(time.Now())
}

////////////////////////////////////////////////////////////////////////////////

type poolOpts struct {
Expand All @@ -81,12 +91,22 @@ type poolOpts struct {
pipelineLimit int
pipelineWindow time.Duration
pt trace.PoolTrace
maxLifetime time.Duration // maximum amount of time a connection may be reused
}

// PoolOpt is an optional behavior which can be applied to the NewPool function
// to effect a Pool's behavior
type PoolOpt func(*poolOpts)

// PoolMaxLifetime sets the maximum amount of time a connection may be reused.
// Expired connections may be closed lazily before reuse.
// If d <= 0, connections are not closed due to a connection's age.
func PoolMaxLifetime(d time.Duration) PoolOpt {
return func(po *poolOpts) {
po.maxLifetime = d
}
}

// PoolConnFunc tells the Pool to use the given ConnFunc when creating new
// Conns to its redis instance. The ConnFunc can be used to set timeouts,
// perform AUTH, or even use custom Conn implementations.
Expand Down Expand Up @@ -497,21 +517,6 @@ func (p *Pool) doOverflowDrain() {
}

func (p *Pool) getExisting() (*ioErrConn, error) {
// Fast-path if the pool is not empty. Return error if pool has been closed.
select {
case ioc, ok := <-p.pool:
if !ok {
return nil, errClientClosed
}
return ioc, nil
default:
}

if p.opts.onEmptyWait == 0 {
// If we should not wait we return without allocating a timer.
return nil, p.opts.errOnEmpty
}

// only set when we have a timeout, since a nil channel always blocks which
// is what we want
var tc <-chan time.Time
Expand All @@ -522,15 +527,29 @@ func (p *Pool) getExisting() (*ioErrConn, error) {
tc = t.C
}

select {
case ioc, ok := <-p.pool:
if !ok {
return nil, errClientClosed
// Fast-path if the pool is not empty. Return error if pool has been closed.
for {
select {
case ioc, ok := <-p.pool:
if !ok {
return nil, errClientClosed
}
if ioc.expired(p.opts.maxLifeDuration) {
ioc.Close()
p.traceConnClosed(trace.PoolConnClosedReasonPoolFull)
atomic.AddInt64(&p.totalConns, -1)
continue
}
return ioc, nil
case <-tc:
return nil, p.opts.errOnEmpty
default:
break
}
return ioc, nil
case <-tc:
return nil, p.opts.errOnEmpty
}

return nil, p.opts.errOnEmpty

}

func (p *Pool) get() (*ioErrConn, error) {
Expand All @@ -547,7 +566,7 @@ func (p *Pool) get() (*ioErrConn, error) {
// discarded.
func (p *Pool) put(ioc *ioErrConn) bool {
p.l.RLock()
if ioc.lastIOErr == nil && !p.closed {
if ioc.lastIOErr == nil && !p.closed && !ioc.expired(p.opts.maxLifetime) {
select {
case p.pool <- ioc:
p.l.RUnlock()
Expand Down

0 comments on commit b36b5bc

Please sign in to comment.