From a814153aebbd5fc8b037f535ff8d1e54209247a8 Mon Sep 17 00:00:00 2001 From: James Hartig Date: Thu, 26 May 2022 10:49:57 -0400 Subject: [PATCH] pgxpool: health check should avoid going below minConns --- pgxpool/conn.go | 23 +++++- pgxpool/pool.go | 174 +++++++++++++++++++++++++++++++++---------- pgxpool/pool_test.go | 38 +++++++++- pgxpool/stat.go | 22 +++++- 4 files changed, 212 insertions(+), 45 deletions(-) diff --git a/pgxpool/conn.go b/pgxpool/conn.go index 036c728a5..6482c821d 100644 --- a/pgxpool/conn.go +++ b/pgxpool/conn.go @@ -2,7 +2,7 @@ package pgxpool import ( "context" - "time" + "sync/atomic" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -26,9 +26,23 @@ func (c *Conn) Release() { res := c.res c.res = nil - now := time.Now() - if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) { + if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' { res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() + return + } + + // If the pool is consistently being used, we might never get to check the + // lifetime of a connection since we only check idle connections in checkConnsHealth + // so we also check the lifetime here and force a health check + if c.p.isExpired(res) { + atomic.AddInt64(&c.p.lifetimeDestroyCount, 1) + res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() return } @@ -42,6 +56,9 @@ func (c *Conn) Release() { res.Release() } else { res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() } }() } diff --git a/pgxpool/pool.go b/pgxpool/pool.go index d75861685..3234c1626 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -3,9 +3,11 @@ package pgxpool import ( "context" "fmt" + "math/rand" "runtime" "strconv" "sync" + "sync/atomic" "time" "github.com/jackc/pgconn" @@ -70,16 +72,23 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows { // Pool allows for connection reuse. type Pool struct { - p *puddle.Pool - config *Config - beforeConnect func(context.Context, *pgx.ConnConfig) error - afterConnect func(context.Context, *pgx.Conn) error - beforeAcquire func(context.Context, *pgx.Conn) bool - afterRelease func(*pgx.Conn) bool - minConns int32 - maxConnLifetime time.Duration - maxConnIdleTime time.Duration - healthCheckPeriod time.Duration + p *puddle.Pool + config *Config + beforeConnect func(context.Context, *pgx.ConnConfig) error + afterConnect func(context.Context, *pgx.Conn) error + beforeAcquire func(context.Context, *pgx.Conn) bool + afterRelease func(*pgx.Conn) bool + minConns int32 + maxConns int32 + maxConnLifetime time.Duration + maxConnLifetimeJitter time.Duration + maxConnIdleTime time.Duration + healthCheckPeriod time.Duration + healthCheckChan chan struct{} + + newConnsCount int64 + lifetimeDestroyCount int64 + idleDestroyCount int64 closeOnce sync.Once closeChan chan struct{} @@ -109,14 +118,19 @@ type Config struct { // MaxConnLifetime is the duration since creation after which a connection will be automatically closed. MaxConnLifetime time.Duration + // MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection. + // This helps prevent all connections from being closed at the exact same time, starving the pool. + MaxConnLifetimeJitter time.Duration + // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check. MaxConnIdleTime time.Duration // MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU(). MaxConns int32 - // MinConns is the minimum size of the pool. The health check will increase the number of connections to this - // amount if it had dropped below. + // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low + // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance + // to create new connections. MinConns int32 // HealthCheckPeriod is the duration between checks of the health of idle connections. @@ -164,16 +178,19 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { } p := &Pool{ - config: config, - beforeConnect: config.BeforeConnect, - afterConnect: config.AfterConnect, - beforeAcquire: config.BeforeAcquire, - afterRelease: config.AfterRelease, - minConns: config.MinConns, - maxConnLifetime: config.MaxConnLifetime, - maxConnIdleTime: config.MaxConnIdleTime, - healthCheckPeriod: config.HealthCheckPeriod, - closeChan: make(chan struct{}), + config: config, + beforeConnect: config.BeforeConnect, + afterConnect: config.AfterConnect, + beforeAcquire: config.BeforeAcquire, + afterRelease: config.AfterRelease, + minConns: config.MinConns, + maxConns: config.MaxConns, + maxConnLifetime: config.MaxConnLifetime, + maxConnLifetimeJitter: config.MaxConnLifetimeJitter, + maxConnIdleTime: config.MaxConnIdleTime, + healthCheckPeriod: config.HealthCheckPeriod, + healthCheckChan: make(chan struct{}, 1), + closeChan: make(chan struct{}), } p.p = puddle.NewPool( @@ -223,7 +240,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { ) if !config.LazyConnect { - if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { + if err := p.checkMinConns(); err != nil { // Couldn't create resources for minpool size. Close unhealthy pool. p.Close() return nil, err @@ -251,6 +268,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { // pool_max_conn_lifetime: duration string // pool_max_conn_idle_time: duration string // pool_health_check_period: duration string +// pool_max_conn_lifetime_jitter: duration string // // See Config for definitions of these arguments. // @@ -331,6 +349,15 @@ func ParseConfig(connString string) (*Config, error) { config.HealthCheckPeriod = defaultHealthCheckPeriod } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter") + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err) + } + config.MaxConnLifetimeJitter = d + } + return config, nil } @@ -343,44 +370,105 @@ func (p *Pool) Close() { }) } +func (p *Pool) isExpired(res *puddle.Resource) bool { + now := time.Now() + // Small optimization to avoid rand. If it's over lifetime AND jitter, immediately + // return true. + if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter { + return true + } + if p.maxConnLifetimeJitter == 0 { + return false + } + jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds() + return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second) +} + +func (p *Pool) triggerHealthCheck() { + go func() { + // Destroy is asynchronous so we give it time to actually remove itself from + // the pool otherwise we might try to check the pool size too soon + time.Sleep(500 * time.Millisecond) + select { + case p.healthCheckChan <- struct{}{}: + default: + } + }() +} + func (p *Pool) backgroundHealthCheck() { ticker := time.NewTicker(p.healthCheckPeriod) - + defer ticker.Stop() for { select { case <-p.closeChan: - ticker.Stop() return + case <-p.healthCheckChan: + p.checkHealth() case <-ticker.C: - p.checkIdleConnsHealth() - p.checkMinConns() + p.checkHealth() } } } -func (p *Pool) checkIdleConnsHealth() { - resources := p.p.AcquireAllIdle() +func (p *Pool) checkHealth() { + for { + // If checkMinConns failed we don't destroy any connections since we couldn't + // even get to minConns + if err := p.checkMinConns(); err != nil { + // Should we log this error somewhere? + break + } + if !p.checkConnsHealth() { + // Since we didn't destroy any connections we can stop looping + break + } + // Technically Destroy is asynchronous but 500ms should be enough for it to + // remove it from the underlying pool + select { + case <-p.closeChan: + return + case <-time.After(500 * time.Millisecond): + } + } +} - now := time.Now() +// checkConnsHealth will check all idle connections, destroy a connection if +// it's idle or too old, and returns true if any were destroyed +func (p *Pool) checkConnsHealth() bool { + var destroyed bool + totalConns := p.Stat().TotalConns() + resources := p.p.AcquireAllIdle() for _, res := range resources { - if now.Sub(res.CreationTime()) > p.maxConnLifetime { + // We're okay going under minConns if the lifetime is up + if p.isExpired(res) && totalConns >= p.minConns { + atomic.AddInt64(&p.lifetimeDestroyCount, 1) res.Destroy() - } else if res.IdleDuration() > p.maxConnIdleTime { + destroyed = true + // Since Destroy is async we manually decrement totalConns. + totalConns-- + } else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns { + atomic.AddInt64(&p.idleDestroyCount, 1) res.Destroy() + destroyed = true + // Since Destroy is async we manually decrement totalConns. + totalConns-- } else { res.ReleaseUnused() } } + return destroyed } -func (p *Pool) checkMinConns() { - for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- { - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - p.p.CreateResource(ctx) - }() +func (p *Pool) checkMinConns() error { + // TotalConns can include ones that are being destroyed but we should have + // sleep(500ms) around all of the destroys to help prevent that from throwing + // off this check + toCreate := p.minConns - p.Stat().TotalConns() + if toCreate > 0 { + return p.createIdleResources(context.Background(), int(toCreate)) } + return nil } func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error { @@ -391,6 +479,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in for i := 0; i < targetResources; i++ { go func() { + atomic.AddInt64(&p.newConnsCount, 1) err := p.p.CreateResource(ctx) errs <- err }() @@ -460,7 +549,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() } // Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics. func (p *Pool) Stat() *Stat { - return &Stat{s: p.p.Stat()} + return &Stat{ + s: p.p.Stat(), + newConnsCount: atomic.LoadInt64(&p.newConnsCount), + lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount), + idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount), + } } // Exec acquires a connection from the Pool and executes the given SQL. diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index 42e029e14..1742f55d6 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -372,6 +372,14 @@ func TestConnReleaseClosesBusyConn(t *testing.T) { c.Release() waitForReleaseToComplete() + // wait for the connection to actually be destroyed + for i := 0; i < 1000; i++ { + if db.Stat().TotalConns() == 0 { + break + } + time.Sleep(time.Millisecond) + } + stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) } @@ -396,6 +404,8 @@ func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount()) } func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) { @@ -426,6 +436,8 @@ func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) + assert.EqualValues(t, 1, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount()) } func TestPoolBackgroundChecksMinConns(t *testing.T) { @@ -443,6 +455,21 @@ func TestPoolBackgroundChecksMinConns(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 2, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount()) + assert.EqualValues(t, 2, stats.NewConnsCount()) + + c, err := db.Acquire(context.Background()) + require.NoError(t, err) + err = c.Conn().Close(context.Background()) + require.NoError(t, err) + c.Release() + + time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond) + + stats = db.Stat() + assert.EqualValues(t, 2, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 3, stats.NewConnsCount()) } func TestPoolExec(t *testing.T) { @@ -679,6 +706,14 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) { c.Release() waitForReleaseToComplete() + // wait for the connection to actually be destroyed + for i := 0; i < 1000; i++ { + if pool.Stat().TotalConns() == 0 { + break + } + time.Sleep(time.Millisecond) + } + assert.EqualValues(t, 0, pool.Stat().TotalConns()) } @@ -767,7 +802,7 @@ func TestTxBeginFuncNestedTransactionCommit(t *testing.T) { require.NoError(t, err) return nil }) - + require.NoError(t, err) return nil }) require.NoError(t, err) @@ -817,6 +852,7 @@ func TestTxBeginFuncNestedTransactionRollback(t *testing.T) { return nil }) + require.NoError(t, err) var n int64 err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n) diff --git a/pgxpool/stat.go b/pgxpool/stat.go index 336be42d5..47342be44 100644 --- a/pgxpool/stat.go +++ b/pgxpool/stat.go @@ -8,7 +8,10 @@ import ( // Stat is a snapshot of Pool statistics. type Stat struct { - s *puddle.Stat + s *puddle.Stat + newConnsCount int64 + lifetimeDestroyCount int64 + idleDestroyCount int64 } // AcquireCount returns the cumulative count of successful acquires from the pool. @@ -62,3 +65,20 @@ func (s *Stat) MaxConns() int32 { func (s *Stat) TotalConns() int32 { return s.s.TotalResources() } + +// NewConnsCount returns the cumulative count of new connections opened. +func (s *Stat) NewConnsCount() int64 { + return s.newConnsCount +} + +// MaxLifetimeDestroyCount returns the cumulative count of connections destroyed +// because they exceeded MaxConnLifetime. +func (s *Stat) MaxLifetimeDestroyCount() int64 { + return s.lifetimeDestroyCount +} + +// MaxIdleDestroyCount returns the cumulative count of connections destroyed because +// they exceeded MaxConnIdleTime. +func (s *Stat) MaxIdleDestroyCount() int64 { + return s.idleDestroyCount +}