Skip to content

Commit

Permalink
riverpgxv5: hijack raw listener conn to assume control
Browse files Browse the repository at this point in the history
The previous implementation of the driver would `Acquire()` a conn from
the `pgxpool.Pool`, but keep the pool in control of the conn. This meant
that the pool would still do its regular maintenance checks on the conn,
such as closing it after it had reached its max lifetime.

This isn't actually desirable in the case of a `LISTEN` listener—we want
the listener to stay alive as long as possible and to avoid missing any
notifications. We perform our own health checks on the conn in the form
of periodic pings, which should be sufficient to make sure the conn is
still connected and functioning properly.

As such, adjust the driver so that it calls `Hijack()` on the conn
immediately after acquiring it, assuming full control of it from the
underlying pool. The listener is still responsible for closing the conn
at shutdown.

Fixes #660.
  • Loading branch information
bgentry committed Oct 29, 2024
1 parent 84d2185 commit a4944da
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661).

## [0.13.0] - 2024-10-07

⚠️ Version 0.13.0 removes the original advisory lock based unique jobs implementation that was deprecated in v0.12.0. See details in the note below or the v0.12.0 release notes.
Expand Down
18 changes: 10 additions & 8 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
}

type Listener struct {
conn *pgxpool.Conn
conn *pgx.Conn
dbPool *pgxpool.Pool
prefix string
mu sync.Mutex
Expand All @@ -753,11 +753,10 @@ func (l *Listener) Close(ctx context.Context) error {
// connection back into rotation, but in case a Listen was invoked without a
// subsequent Unlisten on the same topic, close the connection explicitly to
// guarantee no other caller will receive a partially tainted connection.
err := l.conn.Conn().Close(ctx)
err := l.conn.Close(ctx)

// Even in the event of an error, make sure conn is set back to nil so that
// the listener can be reused.
l.conn.Release()
l.conn = nil

return err
Expand All @@ -771,19 +770,22 @@ func (l *Listener) Connect(ctx context.Context) error {
return errors.New("connection already established")
}

conn, err := l.dbPool.Acquire(ctx)
poolConn, err := l.dbPool.Acquire(ctx)
if err != nil {
return err
}

var schema string
if err := conn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil {
conn.Release()
if err := poolConn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil {
poolConn.Release()
return err
}

l.prefix = schema + "."
l.conn = conn
// Assume full ownership of the conn so that it doesn't get released back to
// the pool or auto-closed by the pool.
l.conn = poolConn.Hijack()

return nil
}

Expand Down Expand Up @@ -814,7 +816,7 @@ func (l *Listener) WaitForNotification(ctx context.Context) (*riverdriver.Notifi
l.mu.Lock()
defer l.mu.Unlock()

notification, err := l.conn.Conn().WaitForNotification(ctx)
notification, err := l.conn.WaitForNotification(ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a4944da

Please sign in to comment.