Skip to content

Commit

Permalink
Handle bad message ordering - make it catchable. Fixes 3174 (#3289)
Browse files Browse the repository at this point in the history
* Handle bad message ordering - make it catchable. Fixes 3174

* Close client in test

* Mess w/ github action settings

* update ci config

* Remove redundant tests

* Update code to use handle error event

* Add tests for commandComplete message being out of order

* Lint fix

* Fix native tests

* Fix lint again...airport computer not my friend

* Not a native issue
  • Loading branch information
brianc authored Sep 17, 2024
1 parent 92bb9a2 commit f73b22f
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 53 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- run: yarn install --frozen-lockfile
- run: yarn lint
build:
timeout-minutes: 10
timeout-minutes: 15
needs: lint
services:
postgres:
Expand All @@ -44,8 +44,8 @@ jobs:
- '22'
os:
- ubuntu-latest
name: Node.js ${{ matrix.node }} (${{ matrix.os }})
runs-on: ${{ matrix.os }}
name: Node.js ${{ matrix.node }}
runs-on: ubuntu-latest
env:
PGUSER: postgres
PGPASSWORD: postgres
Expand All @@ -71,5 +71,4 @@ jobs:
node-version: ${{ matrix.node }}
cache: yarn
- run: yarn install --frozen-lockfile
# TODO(bmc): get ssl tests working in ci
- run: yarn test
13 changes: 1 addition & 12 deletions packages/pg-native/test/many-connections.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var bytes = require('crypto').pseudoRandomBytes
describe('many connections', function () {
describe('async', function () {
var test = function (count, times) {
it('connecting ' + count + ' clients ' + times, function (done) {
it(`connecting ${count} clients ${times} times`, function (done) {
this.timeout(200000)

var connectClient = function (n, cb) {
Expand Down Expand Up @@ -38,20 +38,9 @@ describe('many connections', function () {
}

test(1, 1)
test(1, 1)
test(1, 1)
test(5, 5)
test(5, 5)
test(5, 5)
test(5, 5)
test(10, 10)
test(10, 10)
test(10, 10)
test(20, 20)
test(20, 20)
test(20, 20)
test(30, 10)
test(30, 10)
test(30, 10)
})
})
12 changes: 11 additions & 1 deletion packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,21 @@ class Client extends EventEmitter {
}

_handleCommandComplete(msg) {
if (this.activeQuery == null) {
const error = new Error('Received unexpected commandComplete message from backend.')
this._handleErrorEvent(error)
return
}
// delegate commandComplete to active query
this.activeQuery.handleCommandComplete(msg, this.connection)
}

_handleParseComplete(msg) {
_handleParseComplete() {
if (this.activeQuery == null) {
const error = new Error('Received unexpected parseComplete message from backend.')
this._handleErrorEvent(error)
return
}
// if a prepared statement has a name and properly parses
// we track that its already been executed so we don't parse
// it again on the same client
Expand Down
57 changes: 21 additions & 36 deletions packages/pg/script/create-test-tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,26 @@ var people = [
{ name: 'Zanzabar', age: 260 },
]

var con = new pg.Client({
user: args.user,
password: args.password,
host: args.host,
port: args.port,
database: args.database,
})

con.connect((err) => {
if (err) {
throw err
}

con.query(
'DROP TABLE IF EXISTS person;' + ' CREATE TABLE person (id serial, name varchar(10), age integer)',
(err) => {
if (err) {
throw err
}

console.log('Created table person')
console.log('Filling it with people')

con.query(
'INSERT INTO person (name, age) VALUES' +
people.map((person) => ` ('${person.name}', ${person.age})`).join(','),
(err, result) => {
if (err) {
throw err
}

console.log(`Inserted ${result.rowCount} people`)
con.end()
}
)
}
async function run() {
var con = new pg.Client({
user: args.user,
password: args.password,
host: args.host,
port: args.port,
database: args.database,
})
console.log('creating test dataset')
await con.connect()
await con.query('DROP TABLE IF EXISTS person')
await con.query('CREATE TABLE person (id serial, name varchar(10), age integer)')
await con.query(
'INSERT INTO person (name, age) VALUES' + people.map((person) => ` ('${person.name}', ${person.age})`).join(',')
)
await con.end()
console.log('created test dataset')
}

run().catch((e) => {
console.log('setup failed', e)
process.exit(255)
})
167 changes: 167 additions & 0 deletions packages/pg/test/integration/gh-issues/3174-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
const net = require('net')
const buffers = require('../../test-buffers')
const helper = require('../test-helper')
const assert = require('assert')
const cli = require('../../cli')

const suite = new helper.Suite()

const options = {
host: 'localhost',
port: Math.floor(Math.random() * 2000) + 2000,
connectionTimeoutMillis: 2000,
user: 'not',
database: 'existing',
}

const startMockServer = (port, badBuffer, callback) => {
const sockets = new Set()

const server = net.createServer((socket) => {
sockets.add(socket)
socket.once('end', () => sockets.delete(socket))

socket.on('data', (data) => {
// deny request for SSL
if (data.length === 8) {
socket.write(Buffer.from('N', 'utf8'))
return
// consider all authentication requests as good
}
// the initial message coming in has a 0 message type for authentication negotiation
if (!data[0]) {
socket.write(buffers.authenticationOk())
// send ReadyForQuery `timeout` ms after authentication
socket.write(buffers.readyForQuery())
return
// respond with our canned response
}
const code = data.toString('utf8', 0, 1)
switch (code) {
// parse
case 'P':
socket.write(buffers.parseComplete())
socket.write(buffers.bindComplete())
socket.write(buffers.rowDescription())
socket.write(buffers.dataRow())
socket.write(buffers.commandComplete('FOO BAR'))
socket.write(buffers.readyForQuery())
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
setImmediate(() => {
socket.write(badBuffer)
})
break
case 'Q':
socket.write(buffers.rowDescription())
socket.write(buffers.dataRow())
socket.write(buffers.commandComplete('FOO BAR'))
socket.write(buffers.readyForQuery())
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
setImmediate(() => {
socket.write(badBuffer)
})
default:
// console.log('got code', code)
}
})
})

const closeServer = () => {
for (const socket of sockets) {
socket.destroy()
}
return new Promise((resolve) => {
server.close(resolve)
})
}

server.listen(port, options.host, () => callback(closeServer))
}

const delay = (ms) =>
new Promise((resolve) => {
setTimeout(resolve, ms)
})

const testErrorBuffer = (bufferName, errorBuffer) => {
suite.testAsync(`Out of order ${bufferName} on simple query is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

await client.query('SELECT NOW()')
await delay(50)

// the native client only emits a notice message and keeps on its merry way
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
}

await closeServer()
})

suite.testAsync(`Out of order ${bufferName} on extended query is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

await client.query('SELECT $1', ['foo'])
await delay(40)

// the native client only emits a notice message and keeps on its merry way
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
}

await client.end()

await closeServer()
})

suite.testAsync(`Out of order ${bufferName} on pool is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const pool = new helper.pg.Pool(options)

let errorHit = false
pool.on('error', () => {
errorHit = true
})

await pool.query('SELECT $1', ['foo'])
await delay(100)

if (!cli.native) {
assert(errorHit)
assert.strictEqual(pool.idleCount, 0, 'Pool should have no idle clients')
assert.strictEqual(pool.totalCount, 0, 'Pool should have no connected clients')
}

await pool.end()
await closeServer()
})
}

if (!helper.args.native) {
testErrorBuffer('parseComplete', buffers.parseComplete())
testErrorBuffer('commandComplete', buffers.commandComplete('f'))
}

0 comments on commit f73b22f

Please sign in to comment.