diff --git a/README.md b/README.md index 46d30f3..cd09299 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ async function query(sql) { ```js import { ShardRegistrar } from '@jcoreio/postgres-shard-coordinator' +import { Pool } from 'pg' +import PgIpc from '@jcoreio/pg-ipc' import requireEnv from '@jcoreio/require-env' import migrate from './migrate' @@ -83,11 +85,18 @@ const database = { database: requireEnv('DB_NAME'), password: requireEnv('DB_PASSWORD'), port: parseInt(requireEnv('DB_PORT')), - native: true, // optional, use pg-native } +const ipc = new PgIpc({ + newClient: () => new Client(database), +}) +ipc.on('error', (err) => console.error(err.stack)) + +const pool = new Pool(database) + const registrar = new ShardRegistrar({ - database, + pool, + ipc, cluster: 'clarity_notifications', heartbeatInterval: 60, // seconds gracePeriod: 30, // seconds @@ -99,5 +108,10 @@ registrar.on('shardChanged', ({ shard, numShards }) => { }) registrar.on('error', (err) => console.error(err.stack)) -migrate({ database }).then(() => registrar.start()) +async function go() { + await migrate({ database }) + await registrar.start() +} + +go() ``` diff --git a/package.json b/package.json index e8fa6ec..4ab60bd 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "@babel/register": "^7.15.3", "@commitlint/cli": "^13.2.0", "@commitlint/config-conventional": "^13.2.0", + "@jcoreio/pg-ipc": "^1.0.0", "@jcoreio/poll": "^2.3.1", "@jcoreio/require-env": "^1.0.11", "@jedwards1211/commitlint-config": "^1.0.2", @@ -109,6 +110,7 @@ "lodash": "^4.17.11", "mocha": "^9.1.2", "nyc": "^15.1.0", + "pg": "^8.7.1", "p-event": "^2.1.0", "prettier": "^2.4.1", "prettier-eslint": "^13.0.0", @@ -121,7 +123,6 @@ "@babel/runtime": "^7.15.4", "@jcoreio/typed-event-emitter": "^1.0.0", "debug": "^4.1.1", - "pg": "^8.7.1", "uuid": "^3.3.2" } } diff --git a/src/ShardRegistrar.js b/src/ShardRegistrar.js index aa531af..68468d5 100644 --- a/src/ShardRegistrar.js +++ b/src/ShardRegistrar.js @@ -7,8 +7,6 @@ import uuidv4 from 'uuid/v4' import debug from 'debug' import ShardReservationCluster from './schema/ShardReservationCluster' import ShardReservation from './schema/ShardReservation' -const pg = require('pg') -import type { Client, ResultSet } from 'pg' const RESHARD_DEBUG = debug.enabled('ShardRegistrar:reshard') @@ -17,16 +15,27 @@ export type ShardRegistrarEvents = {| error: [Error], |} +type PgResult = $ReadOnly<{ + rows: { [string]: any }[], + ... +}> + +type PgListener = (channel: string, payload: any) => any + +interface PgIpc { + notify(channel: string, payload?: any): Promise; + listen(channel: string, listener: PgListener): Promise; + unlisten(channel: string, listener: PgListener): Promise; +} + +interface PgPool { + query(sql: string, params?: any[]): Promise; +} + export type ShardRegistrarOptions = $ReadOnly<{ cluster: string, - database: $ReadOnly<{ - database: string, - user: string, - password: string, - host: string, - port: number, - native?: boolean, - }>, + pool: PgPool, + ipc: PgIpc, heartbeatInterval: number, gracePeriod: number, reshardInterval: number, @@ -37,20 +46,20 @@ export default class ShardRegistrar extends EventEmitter { _options: ShardRegistrarOptions _heartbeatTimeout: ?TimeoutID _holder: string = uuidv4() - _client: Client + _pool: PgPool + _ipc: PgIpc _shard: ?number _numShards: ?number _running: boolean = false _debug: any = debug(`ShardRegistrar:${this._holder.substring(0, 8)}`) _upsertedCluster: boolean = false - _lastQuery: ?Promise + _lastQuery: ?Promise constructor(options: ShardRegistrarOptions) { super() this._options = options - this._client = new ( - this._options.database.native ? pg.native.Client : pg.Client - )({ ...options.database }) + this._pool = options.pool + this._ipc = options.ipc } shardInfo(): { shard: number, numShards: number } { @@ -66,11 +75,7 @@ export default class ShardRegistrar extends EventEmitter { if (this._running) return this._running = true this._upsertedCluster = false - const { _holder: holder } = this - await this._client.connect() - this._client.on('notification', this._onNotification) - this._client.on('error', this._onError) - await this._query(`LISTEN "shardInfo/${holder}"`) + this._ipc.listen(`shardInfo/${this._holder}`, this._onNotification) this._onHeartbeat() } @@ -78,17 +83,12 @@ export default class ShardRegistrar extends EventEmitter { if (!this._running) return this._running = false if (this._heartbeatTimeout != null) clearTimeout(this._heartbeatTimeout) - this._client.removeListener('notification', this._onNotification) + this._ipc.unlisten(`shardInfo/${this._holder}`, this._onNotification) try { await this._lastQuery } catch (error) { // ignore } - await this._client.end() - this._client.removeListener('error', this._onError) - this._client = new ( - this._options.database.native ? pg.native.Client : pg.Client - )({ ...this._options.database }) } _onError: (err: Error) => any = (err: Error) => this.emit('error', err) @@ -101,29 +101,20 @@ export default class ShardRegistrar extends EventEmitter { } } - _onNotification: ({ - channel: string, - payload?: string, - ... - }) => any = ({ - channel, - payload, - }: { + _onNotification: (channel: string, payload: any) => any = ( channel: string, - payload?: string, - ... - }) => { + payload: any + ) => { this._debug(channel, payload) - const obj = payload ? JSON.parse(payload) : null try { - if (!obj) { + if (!payload) { throw new Error( `received invalid payload from Postgres channel "${channel}": ${String( payload )}` ) } - const { shard, numShards } = obj + const { shard, numShards } = payload if (typeof shard !== 'number' || typeof numShards !== 'number') { throw new Error( `received invalid payload from Postgres channel "${channel}": ${String( @@ -147,10 +138,10 @@ export default class ShardRegistrar extends EventEmitter { } } - async _query(sql: string, params?: Array): Promise { + async _query(sql: string, params?: Array): Promise { this._debug(sql, params) if (!this._running) throw new Error('already stopped') - const result = await (this._lastQuery = this._client.query(sql, params)) + const result = await (this._lastQuery = this._pool.query(sql, params)) this._debug(result.rows) return result } diff --git a/test/database.js b/test/database.js index 08f6a68..facf9fd 100644 --- a/test/database.js +++ b/test/database.js @@ -1,9 +1,14 @@ // @flow import requireEnv from '@jcoreio/require-env' -import { type ShardRegistrarOptions } from '../src' -export const database: $PropertyType = { +export const database: {| + user: string, + host: string, + database: string, + password: string, + port: number, +|} = { user: 'postgres', host: 'localhost', database: 'postgres', diff --git a/test/index.js b/test/index.js index 6957901..a60af08 100644 --- a/test/index.js +++ b/test/index.js @@ -5,38 +5,25 @@ import { umzugMigrationOptions, } from '../src' import { database } from './database' -import { describe, it, afterEach, before, beforeEach } from 'mocha' +import { describe, it, after, afterEach, before, beforeEach } from 'mocha' import { expect } from 'chai' import emitted from 'p-event' import delay from 'delay' import { range } from 'lodash' -import { Client } from 'pg' +import { Client, Pool } from 'pg' +import PgIpc from '@jcoreio/pg-ipc' import Umzug from 'umzug' import UmzugPostgresStorage from './util/UmzugPostgresStorage' import poll from '@jcoreio/poll' async function prepareTestDatabase(): Promise { - let client await poll(async (): Promise => { - client = new Client({ ...database, database: 'postgres' }) + const client = new Client({ ...database, database: 'postgres' }) await client.connect() await client.end() }, 1000).timeout(15000) - // try { - // const { - // rows: [{ database_exists }], - // } = await client.query({ - // text: `SELECT EXISTS (SELECT FROM pg_database WHERE datname = $1) AS database_exists`, - // values: [database.database], - // }) - // if (!database_exists) { - // await client.query(`CREATE DATABASE ${database.database};`) - // } - // } finally { - // await client.end() - // } - - client = new Client({ ...database }) + + const client = new Client({ ...database }) await client.connect() try { await client.query(`DROP SCHEMA IF EXISTS public CASCADE;`) @@ -46,6 +33,16 @@ async function prepareTestDatabase(): Promise { } } +const pool = new Pool({ ...database }) +// eslint-disable-next-line no-console +pool.on('error', (error) => console.error(error.stack)) + +const ipc = new PgIpc({ + newClient: () => new Client({ ...database }), +}) +// eslint-disable-next-line no-console +ipc.on('error', (error) => console.error(error.stack)) + before(async function (): Promise { this.timeout(30000) @@ -54,8 +51,6 @@ before(async function (): Promise { beforeEach(async function (): Promise { this.timeout(30000) - const client = new Client({ ...database }) - await client.connect() const umzug = new Umzug({ storage: new UmzugPostgresStorage({ database }), @@ -66,17 +61,15 @@ beforeEach(async function (): Promise { }, migrations: { ...umzugMigrationOptions(), - params: [{ query: (sql: string) => client.query(sql) }], + params: [{ query: (...args) => pool.query(...args) }], }, }) - try { - await umzug.up() - } finally { - await client.end() - } + await umzug.up() }) +after(() => Promise.all([pool.end(), ipc.end()])) + describe('ShardRegistrar', function () { this.timeout(30000) let registrars = [] @@ -100,7 +93,8 @@ describe('ShardRegistrar', function () { const gracePeriod = 3 const reshardInterval = 5 const options = { - database, + pool, + ipc, cluster, heartbeatInterval, gracePeriod, @@ -183,7 +177,8 @@ describe('ShardRegistrar', function () { const numShards = 10 const clusterA = range(numShards).map(() => createRegistrar({ - database, + pool, + ipc, cluster: 'a', heartbeatInterval, gracePeriod, @@ -192,7 +187,8 @@ describe('ShardRegistrar', function () { ) const clusterB = range(numShards).map(() => createRegistrar({ - database, + pool, + ipc, cluster: 'b', heartbeatInterval, gracePeriod, diff --git a/yarn.lock b/yarn.lock index 2f1ee5c..5714ff8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1181,6 +1181,16 @@ bindings "^1.5.0" node-addon-api "^2.0.0" +"@jcoreio/pg-ipc@^1.0.0": + version "1.0.0" + resolved "https://registry.npmjs.org/@jcoreio/pg-ipc/-/pg-ipc-1.0.0.tgz#185a3c59286602e2ef1bb922c4faafd1d3c18737" + integrity sha512-K6ZiOM/NgWJjGAOBs67xWnviCvdF1PzfyhvZGg+aQWvG77ijJvUBnDbhc5U2nhoibTLnYs58PmOjWNpyh7PQng== + dependencies: + "@babel/runtime" "^7.15.4" + "@jcoreio/typed-event-emitter" "^1.1.0" + strict-event-emitter-types "^2.0.0" + verror "^1.10.1" + "@jcoreio/poll@^2.3.1": version "2.3.1" resolved "https://registry.npmjs.org/@jcoreio/poll/-/poll-2.3.1.tgz#66e46b84264adb90a645af002186100b2d158233" @@ -1198,6 +1208,13 @@ resolved "https://registry.yarnpkg.com/@jcoreio/typed-event-emitter/-/typed-event-emitter-1.0.0.tgz#6ebd3e0c1a6423a77ad9964e779cf156ae65c992" integrity sha512-IGLRp2RfxSsO0iPEGu/QtxWH/NxCaUxIiWvq0HGwxINvk9L1oAB+h9RNjZPwLBeqsivwiY9jvAZKQIbKNxIbXw== +"@jcoreio/typed-event-emitter@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@jcoreio/typed-event-emitter/-/typed-event-emitter-1.1.0.tgz#851126282f43f1db02242827b969912fd29e4b99" + integrity sha512-/jipgciwKe/MpQ1CB1Sx2tuEPDfWH5cJksNrC0XM6cZuVD9b4DgvkCExPD71+R7jAd8I1dK3xoHoGdcJfFscOQ== + dependencies: + p-timeout "^3.2.0" + "@jedwards1211/commitlint-config@^1.0.2": version "1.0.2" resolved "https://registry.npmjs.org/@jedwards1211/commitlint-config/-/commitlint-config-1.0.2.tgz#9c4c729daf69ff2f6a41be4aebd0c0cd804acce3" @@ -5479,6 +5496,13 @@ p-timeout@^2.0.1: dependencies: p-finally "^1.0.0" +p-timeout@^3.2.0: + version "3.2.0" + resolved "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe" + integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg== + dependencies: + p-finally "^1.0.0" + p-try@^1.0.0: version "1.0.0" resolved "https://registry.npmjs.org/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3" @@ -6502,6 +6526,11 @@ stream-events@^1.0.5: dependencies: stubs "^3.0.0" +strict-event-emitter-types@^2.0.0: + version "2.0.0" + resolved "https://registry.npmjs.org/strict-event-emitter-types/-/strict-event-emitter-types-2.0.0.tgz#05e15549cb4da1694478a53543e4e2f4abcf277f" + integrity sha512-Nk/brWYpD85WlOgzw5h173aci0Teyv8YdIAEtV+N88nDB0dLlazZyJMIsN6eo1/AR61l+p6CJTG1JIyFaoNEEA== + string-argv@0.3.1: version "0.3.1" resolved "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz#95e2fbec0427ae19184935f816d74aaa4c5c19da" @@ -7072,6 +7101,15 @@ verror@1.10.0: core-util-is "1.0.2" extsprintf "^1.2.0" +verror@^1.10.1: + version "1.10.1" + resolved "https://registry.npmjs.org/verror/-/verror-1.10.1.tgz#4bf09eeccf4563b109ed4b3d458380c972b0cdeb" + integrity sha512-veufcmxri4e3XSrT0xwfUR7kguIkaxBeosDg00yDWhk49wdwkSUrvvsm7nc75e1PUyvIeZj6nS8VQRYz2/S4Xg== + dependencies: + assert-plus "^1.0.0" + core-util-is "1.0.2" + extsprintf "^1.2.0" + vinyl@^1.1.1: version "1.2.0" resolved "https://registry.npmjs.org/vinyl/-/vinyl-1.2.0.tgz#5c88036cf565e5df05558bfc911f8656df218884"