Skip to content

Commit

Permalink
pgxpool: health check should avoid going below minConns
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshartig authored and jackc committed Jun 7, 2022
1 parent 37c3f15 commit a814153
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 45 deletions.
23 changes: 20 additions & 3 deletions pgxpool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pgxpool

import (
"context"
"time"
"sync/atomic"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
Expand All @@ -26,9 +26,23 @@ func (c *Conn) Release() {
res := c.res
c.res = nil

now := time.Now()
if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) {
if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
res.Destroy()
// Signal to the health check to run since we just destroyed a connections
// and we might be below minConns now
c.p.triggerHealthCheck()
return
}

// If the pool is consistently being used, we might never get to check the
// lifetime of a connection since we only check idle connections in checkConnsHealth
// so we also check the lifetime here and force a health check
if c.p.isExpired(res) {
atomic.AddInt64(&c.p.lifetimeDestroyCount, 1)
res.Destroy()
// Signal to the health check to run since we just destroyed a connections
// and we might be below minConns now
c.p.triggerHealthCheck()
return
}

Expand All @@ -42,6 +56,9 @@ func (c *Conn) Release() {
res.Release()
} else {
res.Destroy()
// Signal to the health check to run since we just destroyed a connections
// and we might be below minConns now
c.p.triggerHealthCheck()
}
}()
}
Expand Down
174 changes: 134 additions & 40 deletions pgxpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package pgxpool
import (
"context"
"fmt"
"math/rand"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/jackc/pgconn"
Expand Down Expand Up @@ -70,16 +72,23 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {

// Pool allows for connection reuse.
type Pool struct {
p *puddle.Pool
config *Config
beforeConnect func(context.Context, *pgx.ConnConfig) error
afterConnect func(context.Context, *pgx.Conn) error
beforeAcquire func(context.Context, *pgx.Conn) bool
afterRelease func(*pgx.Conn) bool
minConns int32
maxConnLifetime time.Duration
maxConnIdleTime time.Duration
healthCheckPeriod time.Duration
p *puddle.Pool
config *Config
beforeConnect func(context.Context, *pgx.ConnConfig) error
afterConnect func(context.Context, *pgx.Conn) error
beforeAcquire func(context.Context, *pgx.Conn) bool
afterRelease func(*pgx.Conn) bool
minConns int32
maxConns int32
maxConnLifetime time.Duration
maxConnLifetimeJitter time.Duration
maxConnIdleTime time.Duration
healthCheckPeriod time.Duration
healthCheckChan chan struct{}

newConnsCount int64
lifetimeDestroyCount int64
idleDestroyCount int64

closeOnce sync.Once
closeChan chan struct{}
Expand Down Expand Up @@ -109,14 +118,19 @@ type Config struct {
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
MaxConnLifetime time.Duration

// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
// This helps prevent all connections from being closed at the exact same time, starving the pool.
MaxConnLifetimeJitter time.Duration

// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
MaxConnIdleTime time.Duration

// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
MaxConns int32

// MinConns is the minimum size of the pool. The health check will increase the number of connections to this
// amount if it had dropped below.
// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
// to create new connections.
MinConns int32

// HealthCheckPeriod is the duration between checks of the health of idle connections.
Expand Down Expand Up @@ -164,16 +178,19 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
}

p := &Pool{
config: config,
beforeConnect: config.BeforeConnect,
afterConnect: config.AfterConnect,
beforeAcquire: config.BeforeAcquire,
afterRelease: config.AfterRelease,
minConns: config.MinConns,
maxConnLifetime: config.MaxConnLifetime,
maxConnIdleTime: config.MaxConnIdleTime,
healthCheckPeriod: config.HealthCheckPeriod,
closeChan: make(chan struct{}),
config: config,
beforeConnect: config.BeforeConnect,
afterConnect: config.AfterConnect,
beforeAcquire: config.BeforeAcquire,
afterRelease: config.AfterRelease,
minConns: config.MinConns,
maxConns: config.MaxConns,
maxConnLifetime: config.MaxConnLifetime,
maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
maxConnIdleTime: config.MaxConnIdleTime,
healthCheckPeriod: config.HealthCheckPeriod,
healthCheckChan: make(chan struct{}, 1),
closeChan: make(chan struct{}),
}

p.p = puddle.NewPool(
Expand Down Expand Up @@ -223,7 +240,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
)

if !config.LazyConnect {
if err := p.createIdleResources(ctx, int(p.minConns)); err != nil {
if err := p.checkMinConns(); err != nil {
// Couldn't create resources for minpool size. Close unhealthy pool.
p.Close()
return nil, err
Expand Down Expand Up @@ -251,6 +268,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
// pool_max_conn_lifetime: duration string
// pool_max_conn_idle_time: duration string
// pool_health_check_period: duration string
// pool_max_conn_lifetime_jitter: duration string
//
// See Config for definitions of these arguments.
//
Expand Down Expand Up @@ -331,6 +349,15 @@ func ParseConfig(connString string) (*Config, error) {
config.HealthCheckPeriod = defaultHealthCheckPeriod
}

if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
d, err := time.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
}
config.MaxConnLifetimeJitter = d
}

return config, nil
}

Expand All @@ -343,44 +370,105 @@ func (p *Pool) Close() {
})
}

func (p *Pool) isExpired(res *puddle.Resource) bool {
now := time.Now()
// Small optimization to avoid rand. If it's over lifetime AND jitter, immediately
// return true.
if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter {
return true
}
if p.maxConnLifetimeJitter == 0 {
return false
}
jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds()
return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second)
}

func (p *Pool) triggerHealthCheck() {
go func() {
// Destroy is asynchronous so we give it time to actually remove itself from
// the pool otherwise we might try to check the pool size too soon
time.Sleep(500 * time.Millisecond)
select {
case p.healthCheckChan <- struct{}{}:
default:
}
}()
}

func (p *Pool) backgroundHealthCheck() {
ticker := time.NewTicker(p.healthCheckPeriod)

defer ticker.Stop()
for {
select {
case <-p.closeChan:
ticker.Stop()
return
case <-p.healthCheckChan:
p.checkHealth()
case <-ticker.C:
p.checkIdleConnsHealth()
p.checkMinConns()
p.checkHealth()
}
}
}

func (p *Pool) checkIdleConnsHealth() {
resources := p.p.AcquireAllIdle()
func (p *Pool) checkHealth() {
for {
// If checkMinConns failed we don't destroy any connections since we couldn't
// even get to minConns
if err := p.checkMinConns(); err != nil {
// Should we log this error somewhere?
break
}
if !p.checkConnsHealth() {
// Since we didn't destroy any connections we can stop looping
break
}
// Technically Destroy is asynchronous but 500ms should be enough for it to
// remove it from the underlying pool
select {
case <-p.closeChan:
return
case <-time.After(500 * time.Millisecond):
}
}
}

now := time.Now()
// checkConnsHealth will check all idle connections, destroy a connection if
// it's idle or too old, and returns true if any were destroyed
func (p *Pool) checkConnsHealth() bool {
var destroyed bool
totalConns := p.Stat().TotalConns()
resources := p.p.AcquireAllIdle()
for _, res := range resources {
if now.Sub(res.CreationTime()) > p.maxConnLifetime {
// We're okay going under minConns if the lifetime is up
if p.isExpired(res) && totalConns >= p.minConns {
atomic.AddInt64(&p.lifetimeDestroyCount, 1)
res.Destroy()
} else if res.IdleDuration() > p.maxConnIdleTime {
destroyed = true
// Since Destroy is async we manually decrement totalConns.
totalConns--
} else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
atomic.AddInt64(&p.idleDestroyCount, 1)
res.Destroy()
destroyed = true
// Since Destroy is async we manually decrement totalConns.
totalConns--
} else {
res.ReleaseUnused()
}
}
return destroyed
}

func (p *Pool) checkMinConns() {
for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
p.p.CreateResource(ctx)
}()
func (p *Pool) checkMinConns() error {
// TotalConns can include ones that are being destroyed but we should have
// sleep(500ms) around all of the destroys to help prevent that from throwing
// off this check
toCreate := p.minConns - p.Stat().TotalConns()
if toCreate > 0 {
return p.createIdleResources(context.Background(), int(toCreate))
}
return nil
}

func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
Expand All @@ -391,6 +479,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in

for i := 0; i < targetResources; i++ {
go func() {
atomic.AddInt64(&p.newConnsCount, 1)
err := p.p.CreateResource(ctx)
errs <- err
}()
Expand Down Expand Up @@ -460,7 +549,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() }

// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
func (p *Pool) Stat() *Stat {
return &Stat{s: p.p.Stat()}
return &Stat{
s: p.p.Stat(),
newConnsCount: atomic.LoadInt64(&p.newConnsCount),
lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
}
}

// Exec acquires a connection from the Pool and executes the given SQL.
Expand Down
Loading

0 comments on commit a814153

Please sign in to comment.