Skip to content

Commit

Permalink
Update to pgx4 library
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 1, 2023
1 parent b9ba8be commit a6831bc
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Unreleased
==========

- Add support for Go 1.20 and 1.21, drop support for previous releases.
- Update to pgx4 library

BREAKING CHANGES
----------------
Expand Down
85 changes: 54 additions & 31 deletions crate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"crypto/tls"
"fmt"
"net"
"github.com/jackc/pgx/v4"
"time"

"github.com/go-kit/kit/endpoint"
"github.com/jackc/pgx"
"github.com/jackc/pgx/pgtype"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/common/model"
)

Expand All @@ -36,31 +36,58 @@ type crateReadResponse struct {
}

type crateEndpoint struct {
pool *pgx.ConnPool
poolConf pgx.ConnPoolConfig
pool *pgxpool.Pool
poolConf *pgxpool.Config
}

func newCrateEndpoint(ep *endpointConfig) *crateEndpoint {
connConf := pgx.ConnConfig{
Host: ep.Host,
Port: ep.Port,
User: ep.User,
Password: ep.Password,
Database: ep.Schema,
Dial: (&net.Dialer{
KeepAlive: 30 * time.Second,
Timeout: time.Duration(ep.ConnectTimeout) * time.Second,
}).Dial,

// pgx4 starts using connection strings exclusively, in both URL and DSN formats.
// The single entrypoint to obtain a valid configuration object, is `pgx.ParseConfig`,
// which aims to be compatible with libpq.

// ParseConfig builds a *Config from connString with similar behavior to the PostgreSQL
// standard C library libpq. It uses the same defaults as libpq (e.g. port=5432), and
// understands most PG* environment variables.
//
// ParseConfig closely matches the parsing behavior of libpq. connString may either be
// in URL or DSN format. connString also may be empty to only read from the environment.
// If a password is not supplied it will attempt to read the .pgpass file.
//
// See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING for details.
//
// # Example DSN
// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca
//
// # Example URL
// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca
connectionString := fmt.Sprintf(
"postgres://%s:%s@%s:%v/%s?connect_timeout=%v&pool_max_conns=%v",
ep.User, ep.Password, ep.Host, ep.Port, ep.Schema, ep.ConnectTimeout, ep.MaxConnections)
poolConf, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil
}

// Configure TLS settings.
if ep.EnableTLS {
connConf.TLSConfig = &tls.Config{
poolConf.ConnConfig.TLSConfig = &tls.Config{
ServerName: ep.Host,
InsecureSkipVerify: ep.AllowInsecureTLS,
}
}
poolConf := pgx.ConnPoolConfig{
ConnConfig: connConf,
MaxConnections: ep.MaxConnections,

// pgx v4
// If you are using `pgxpool`, then you can use `AfterConnect` to prepare statements. That will
// ensure that they are available on every connection. Otherwise, you will have to acquire
// a connection from the pool manually and prepare it there before use.
// https://github.com/jackc/pgx/issues/791#issuecomment-660508309
poolConf.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Prepare(ctx, "write_statement", crateWriteStatement)
if err != nil {
return fmt.Errorf("error preparing write statement: %v", err)
}
return err
}
return &crateEndpoint{poolConf: poolConf}
}
Expand All @@ -70,7 +97,7 @@ func (c *crateEndpoint) endpoint() endpoint.Endpoint {
// We initialize the connection pool lazily here instead of in newCrateEndpoint() so
// that the adapter does not crash on startup if an endpoint is unavailable.
if c.pool == nil {
pool, err := pgx.NewConnPool(c.poolConf)
pool, err := pgxpool.ConnectConfig(ctx, c.poolConf)
if err != nil {
return nil, fmt.Errorf("error opening connection to CrateDB: %v", err)
}
Expand All @@ -89,12 +116,7 @@ func (c *crateEndpoint) endpoint() endpoint.Endpoint {
}

func (c crateEndpoint) write(ctx context.Context, r *crateWriteRequest) error {
_, err := c.pool.PrepareEx(ctx, "write_statement", crateWriteStatement, nil)
if err != nil {
return fmt.Errorf("error preparing write statement: %v", err)
}

batch := c.pool.BeginBatch()
batch := &pgx.Batch{}
for _, a := range r.rows {
batch.Queue(
"write_statement",
Expand All @@ -117,20 +139,21 @@ func (c crateEndpoint) write(ctx context.Context, r *crateWriteRequest) error {
)
}

err = batch.Send(ctx, nil)
if err != nil {
return fmt.Errorf("error executing write batch: %v", err)
batchResults := c.pool.SendBatch(ctx, batch)
var qerr error
if qerr != nil {
return fmt.Errorf("error executing write batch: %v", qerr)
}

err = batch.Close()
err := batchResults.Close()
if err != nil {
return fmt.Errorf("error closing write batch: %v", err)
}
return nil
}

func (c crateEndpoint) read(ctx context.Context, r *crateReadRequest) (*crateReadResponse, error) {
rows, err := c.pool.QueryEx(ctx, r.stmt, nil)
rows, err := c.pool.Query(ctx, r.stmt, nil)
if err != nil {
return nil, fmt.Errorf("error executing read request query: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions crate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestNewCrateEndpoint(t *testing.T) {
conf := builtinConfig()
endpoint := newCrateEndpoint(&conf.Endpoints[0])
require.Equal(t,
endpoint.poolConf.ConnString(),
"postgres://crate:@localhost:5432/?connect_timeout=10&pool_max_conns=5")
}
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ require (
github.com/go-kit/kit v0.9.0
github.com/golang/protobuf v1.5.3
github.com/golang/snappy v0.0.4
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v4 v4.18.1
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/prometheus/prometheus v2.3.2-0.20180622142104-2bd510a63e48+incompatible
Expand All @@ -19,22 +20,24 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/gofrs/uuid v4.3.1+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/lib/pq v1.10.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
Expand Down
Loading

0 comments on commit a6831bc

Please sign in to comment.