Skip to content

Commit

Permalink
pg connection cleanup (#13144)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored May 16, 2024
1 parent ecfab64 commit 49f1bf3
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-windows-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

improve handling of postgres connection settings and driver versions #db
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ linters-settings:
desc: Use the standard library instead
- pkg: github.com/gofrs/uuid
desc: Use github.com/google/uuid instead
- pkg: github.com/jackc/pgx3
desc: Use github.com/jackc/pgx4 instead
- pkg: github.com/jackc/pgx5
desc: Use github.com/jackc/pgx4 instead
- pkg: github.com/satori/go.uuid
desc: Use github.com/google/uuid instead
- pkg: github.com/test-go/testify/assert
Expand Down
2 changes: 1 addition & 1 deletion core/internal/testutils/pgtest/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *txDriver) Open(dsn string) (driver.Conn, error) {
defer d.Unlock()
// Open real db connection if its the first call
if d.db == nil {
db, err := sql.Open("pgx", d.dbURL)
db, err := sql.Open(string(dialects.Postgres), d.dbURL)
if err != nil {
return nil, err
}
Expand Down
74 changes: 46 additions & 28 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
"os"
"time"

"github.com/XSAM/otelsql"
"github.com/google/uuid"
_ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/net/context"

"github.com/smartcontractkit/chainlink/v2/core/store/dialects"

"github.com/XSAM/otelsql"
)

// NOTE: This is the default level in Postgres anyway, we just make it
Expand Down Expand Up @@ -48,21 +49,8 @@ type ConnectionConfig interface {
MaxIdleConns() int
}

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error) {
if dialect == dialects.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register
uri = uuid.New().String()
}

// Initialize sql/sqlx
sqldb, err := otelsql.Open(string(dialect), uri,
otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
Expand All @@ -71,22 +59,52 @@ func NewConnection(uri string, dialect dialects.DialectName, config ConnectionCo
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
}),
)
if err != nil {
return nil, err
}
db = sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)
})}

// Set default connection options
lockTimeout := config.DefaultLockTimeout().Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds()
stmt := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())
if _, err = db.Exec(stmt); err != nil {
return nil, err

var sqldb *sql.DB
if dialect == dialects.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register
var err error
sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to open txdb: %w", err)
}
_, err = sqldb.Exec(connParams)
if err != nil {
return nil, fmt.Errorf("failed to set options: %w", err)
}
} else {
// Set sane defaults for every new database connection.
// Those can be overridden with Txn options or SET statements in individual connections.
// The default values are the same for Txns.
connConfig, err := pgx.ParseConfig(uri)
if err != nil {
return nil, fmt.Errorf("database: failed to parse config: %w", err)
}

connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) {
_, err = c.Exec(ctx, connParams)
return
}))

// Initialize sql/sqlx
sqldb = otelsql.OpenDB(connector, opts...)
}
db := sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

setMaxConns(db, config)

if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ require (
golang.org/x/crypto v0.22.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
golang.org/x/mod v0.15.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.6.0
golang.org/x/term v0.19.0
golang.org/x/text v0.14.0
Expand All @@ -111,6 +112,7 @@ require (
google.golang.org/protobuf v1.33.0
gopkg.in/guregu/null.v4 v4.0.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
sigs.k8s.io/yaml v1.4.0
)

require (
Expand Down Expand Up @@ -324,7 +326,6 @@ require (
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/ratelimit v0.3.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
google.golang.org/api v0.149.0 // indirect
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect
Expand All @@ -336,7 +337,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
pgregory.net/rapid v0.5.5 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
sigs.k8s.io/yaml v1.4.0
)

replace (
Expand Down

0 comments on commit 49f1bf3

Please sign in to comment.