Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions chpool/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chpool

import (
"context"
"time"

"github.com/jackc/puddle/v2"

Expand All @@ -21,14 +20,16 @@ func (c *Client) Release() {
return
}

client := c.client()

if client.IsClosed() || time.Since(c.res.CreationTime()) > c.p.options.MaxConnLifetime {
c.res.Destroy()
return
}

c.res.Release()
// calling async since connIsHealthy may block
go func() {
if c.p.connIsHealthy(c.res) {
c.p.options.ClientOptions.Logger.Debug("chpool: releasing connection")
c.res.Release()
} else {
c.p.options.ClientOptions.Logger.Debug("chpool: destoying connection")
c.res.Destroy()
}
}()
}

func (c *Client) Do(ctx context.Context, q ch.Query) (err error) {
Expand Down
22 changes: 22 additions & 0 deletions chpool/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package chpool

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ClickHouse/ch-go"
)

func TestClient_Do(t *testing.T) {
Expand All @@ -17,6 +21,24 @@ func TestClient_Do(t *testing.T) {
testDo(t, conn)
}

func TestClient_ReleaseHealthCheck(t *testing.T) {
t.Parallel()
var healthCheckCnt int64
p := PoolConnOpt(t, Options{
HealthCheckFunc: func(ctx context.Context, client *ch.Client) error {
atomic.AddInt64(&healthCheckCnt, 1)
return nil
},
})
conn, err := p.Acquire(context.Background())
require.NoError(t, err)
assert.Equal(t, int64(0), atomic.LoadInt64(&healthCheckCnt))

conn.Release()
waitForReleaseToComplete()
assert.Equal(t, int64(1), atomic.LoadInt64(&healthCheckCnt))
}

func TestClient_Ping(t *testing.T) {
t.Parallel()
p := PoolConn(t)
Expand Down
7 changes: 5 additions & 2 deletions chpool/conn.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package chpool

import (
"time"

"github.com/jackc/puddle/v2"

"github.com/ClickHouse/ch-go"
)

type connResource struct {
client *ch.Client
clients []Client
lastHealthCheckTimestamp time.Time
client *ch.Client
clients []Client
}

func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Client {
Expand Down
84 changes: 66 additions & 18 deletions chpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"sync"
"time"

"github.com/ClickHouse/ch-go"

"github.com/jackc/puddle/v2"
"go.uber.org/zap"

"github.com/ClickHouse/ch-go"
)

// Pool of connections to ClickHouse.
Expand All @@ -23,19 +24,26 @@ type Pool struct {

// Options for Pool.
type Options struct {
ClientOptions ch.Options
MaxConnLifetime time.Duration
MaxConnIdleTime time.Duration
MaxConns int32
MinConns int32
HealthCheckPeriod time.Duration
ClientOptions ch.Options
MaxConnLifetime time.Duration
MaxConnIdleTime time.Duration
MaxConns int32
MinConns int32
HealthCheckPeriod time.Duration
HealthCheckFunc func(ctx context.Context, client *ch.Client) error
HealthCheckTimeout time.Duration
}

func DefaultHealthCheckFunc(ctx context.Context, client *ch.Client) error {
return client.Ping(ctx)
}

// Defaults for pool.
const (
DefaultMaxConnLifetime = time.Hour
DefaultMaxConnIdleTime = time.Minute * 30
DefaultHealthCheckPeriod = time.Minute
DefaultMaxConnLifetime = time.Hour
DefaultMaxConnIdleTime = time.Minute * 30
DefaultHealthCheckPeriod = time.Minute
DefaultHealthCheckTimeout = time.Second
)

func (o *Options) setDefaults() {
Expand All @@ -51,6 +59,15 @@ func (o *Options) setDefaults() {
if o.HealthCheckPeriod == 0 {
o.HealthCheckPeriod = DefaultHealthCheckPeriod
}
if o.HealthCheckFunc == nil {
o.HealthCheckFunc = DefaultHealthCheckFunc
}
if o.HealthCheckTimeout == 0 {
o.HealthCheckTimeout = DefaultHealthCheckTimeout
}
if o.ClientOptions.Logger == nil {
o.ClientOptions.Logger = zap.NewNop()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent any nil-ptr exception later

}
}

// Dial returns a pool of connections to ClickHouse.
Expand Down Expand Up @@ -162,16 +179,47 @@ func (p *Pool) backgroundHealthCheck() {
func (p *Pool) checkIdleConnsHealth() {
resources := p.pool.AcquireAllIdle()

now := time.Now()
wg := sync.WaitGroup{}
for _, res := range resources {
if now.Sub(res.CreationTime()) > p.options.MaxConnLifetime {
res.Destroy()
} else if res.IdleDuration() > p.options.MaxConnIdleTime {
res.Destroy()
} else {
res.ReleaseUnused()
res := res
wg.Add(1)
go func() {
defer wg.Done()
if res.IdleDuration() > p.options.MaxConnIdleTime || !p.connIsHealthy(res) {
res.Destroy()
} else {
res.ReleaseUnused()
}
}()
wg.Wait()
}
}

func (p *Pool) connIsHealthy(res *puddle.Resource[*connResource]) bool {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to change this name to something better

logger := p.options.ClientOptions.Logger
if res.Value().client.IsClosed() {
logger.Debug("chpool: connection is closed")
return false
}

if time.Since(res.CreationTime()) > p.options.MaxConnLifetime {
logger.Debug("chpool: connection over max lifetime")
return false
}

if p.options.HealthCheckFunc != nil {
ctx, cancel := context.WithTimeout(context.Background(), p.options.HealthCheckTimeout)
defer cancel()
if err := p.options.HealthCheckFunc(ctx, res.Value().client); err != nil {
if logger := p.options.ClientOptions.Logger; logger != nil {
logger.Warn("chpool: health check failed", zap.Error(err))
}
return false
}
}

res.Value().lastHealthCheckTimestamp = time.Now()
return true
}

func (p *Pool) checkMinConns() {
Expand Down
24 changes: 24 additions & 0 deletions chpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package chpool

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/ClickHouse/ch-go"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -61,6 +64,7 @@ func TestPool_Ping(t *testing.T) {
p := PoolConn(t)

require.NoError(t, p.Ping(context.Background()))
waitForReleaseToComplete()

stats := p.Stat()
assert.EqualValues(t, 0, stats.AcquiredResources())
Expand All @@ -78,3 +82,23 @@ func TestPool_Acquire(t *testing.T) {
waitForReleaseToComplete()
require.EqualValues(t, 2, p.Stat().AcquireCount())
}

func TestPool_backgroundHealthCheck(t *testing.T) {
t.Parallel()
var healthCheckCnt int64
p := PoolConnOpt(t, Options{
MinConns: 1,
HealthCheckFunc: func(ctx context.Context, client *ch.Client) error {
atomic.AddInt64(&healthCheckCnt, 1)
return nil
},
HealthCheckPeriod: 500 * time.Millisecond,
})
p.checkMinConns()
p.checkIdleConnsHealth()
assert.GreaterOrEqual(t, int64(1), atomic.LoadInt64(&healthCheckCnt))

hc := atomic.LoadInt64(&healthCheckCnt)
time.Sleep(750 * time.Millisecond)
assert.Equal(t, hc+1, atomic.LoadInt64(&healthCheckCnt))
}
Loading