Skip to content

Conversation

@rcottinet
Copy link

@rcottinet rcottinet commented Feb 9, 2026

Implements a new transport backend for PostgreSQL using NOTIFY/LISTEN for pub/sub messaging

Implementation

  • Transport: PostgresTransport

    • Uses escapeIdentifier() and escapeLiteral() from pg for SQL safety
    • Channel handlers map for multi-channel subscriptions
  • Dependencies: pg as optional peer dependency, @testcontainers/postgresql for tests

Usage

import { postgres } from '@boringnode/bus/transports/postgres'

const manager = new BusManager({
  transports: {
    main: {
      transport: postgres({
        host: 'localhost',
        port: 5432,
        database: 'mydb',
        user: 'postgres',
        password: 'password',
      }),
    },
  },
})

// Or with connection string
const manager = new BusManager({
  transports: {
    main: {
      transport: postgres('postgresql://user:pass@localhost/dbname'),
    },
  },
})

Notes

  • Reconnection handler implemented but requires reconnection logic at application level (pg client doesn't auto-reconnect)
  • NOTIFY payload size limited to 8000 bytes by PostgreSQL

Warning

I think it's only compatible with v14+ postgres

@rcottinet rcottinet force-pushed the 0.x branch 10 times, most recently from 8bf1d2b to c3523c2 Compare February 9, 2026 23:43
@ThisIsMissEm
Copy link

Another approach here could be to have a events database table which you insert a row to, and then trigger a notify to signal that that table has been updated. That means that clients which are disconnected when the notify happens can still retrieve the messages when they reconnect.

Comment on lines 160 to 166
this.#subscriber.on('end', () => {
debug('subscriber connection ended')
this.#subscriberConnected = false
// Attempt to reconnect
this.#subscriber
.connect()
.then(() => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll probably want to use some form of exponential backoff with randomised jitter to ensure safe reconnection during outages.

You probably also want to check here if you should reconnect, e.g., disconnect may emit an end event (I can't recall if it does)

await this.#subscriber.query(`LISTEN ${escapedChannel}`)
}

onReconnect(callback: () => void): void {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll also want to track that you are in fact actually reconnecting, and in that case, block ensureConnected on that reconnection promise, otherwise it's possible that a temporary disconnect can trigger a double connection.

@rcottinet
Copy link
Author

rcottinet commented Feb 10, 2026

Another approach here could be to have a events database table which you insert a row to, and then trigger a notify to signal that that table has been updated. That means that clients which are disconnected when the notify happens can still retrieve the messages when they reconnect.

hi @ThisIsMissEm thanks for your review
That’s a fair point regarding persistence (and also avoid payload size limitation), but I find the current LISTEN/NOTIFY setup less invasive, to keep the architecture simple without adding extra tables - or maybe with some kind of temp table ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants