Skip to content

Commit

Permalink
feat: use @jcoreio/pg-ipc for listen/notify
Browse files Browse the repository at this point in the history
BREAKING CHANGE: constructor options have changed; instead of passing `database` options, pass
a `pool` (instanceof `pg.Pool`) and `ipc` (instanceof `PgIpc` from `@jcoreio/pg-ipc`)
  • Loading branch information
jedwards1211 committed Dec 23, 2021
1 parent 4b0241c commit 5d6aa4a
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 78 deletions.
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ async function query(sql) {

```js
import { ShardRegistrar } from '@jcoreio/postgres-shard-coordinator'
import { Pool } from 'pg'
import PgIpc from '@jcoreio/pg-ipc'
import requireEnv from '@jcoreio/require-env'
import migrate from './migrate'

Expand All @@ -83,11 +85,18 @@ const database = {
database: requireEnv('DB_NAME'),
password: requireEnv('DB_PASSWORD'),
port: parseInt(requireEnv('DB_PORT')),
native: true, // optional, use pg-native
}

const ipc = new PgIpc({
newClient: () => new Client(database),
})
ipc.on('error', (err) => console.error(err.stack))

const pool = new Pool(database)

const registrar = new ShardRegistrar({
database,
pool,
ipc,
cluster: 'clarity_notifications',
heartbeatInterval: 60, // seconds
gracePeriod: 30, // seconds
Expand All @@ -99,5 +108,10 @@ registrar.on('shardChanged', ({ shard, numShards }) => {
})
registrar.on('error', (err) => console.error(err.stack))

migrate({ database }).then(() => registrar.start())
async function go() {
await migrate({ database })
await registrar.start()
}

go()
```
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"@babel/register": "^7.15.3",
"@commitlint/cli": "^13.2.0",
"@commitlint/config-conventional": "^13.2.0",
"@jcoreio/pg-ipc": "^1.0.0",
"@jcoreio/poll": "^2.3.1",
"@jcoreio/require-env": "^1.0.11",
"@jedwards1211/commitlint-config": "^1.0.2",
Expand All @@ -109,6 +110,7 @@
"lodash": "^4.17.11",
"mocha": "^9.1.2",
"nyc": "^15.1.0",
"pg": "^8.7.1",
"p-event": "^2.1.0",
"prettier": "^2.4.1",
"prettier-eslint": "^13.0.0",
Expand All @@ -121,7 +123,6 @@
"@babel/runtime": "^7.15.4",
"@jcoreio/typed-event-emitter": "^1.0.0",
"debug": "^4.1.1",
"pg": "^8.7.1",
"uuid": "^3.3.2"
}
}
75 changes: 33 additions & 42 deletions src/ShardRegistrar.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import uuidv4 from 'uuid/v4'
import debug from 'debug'
import ShardReservationCluster from './schema/ShardReservationCluster'
import ShardReservation from './schema/ShardReservation'
const pg = require('pg')
import type { Client, ResultSet } from 'pg'

const RESHARD_DEBUG = debug.enabled('ShardRegistrar:reshard')

Expand All @@ -17,16 +15,27 @@ export type ShardRegistrarEvents = {|
error: [Error],
|}

type PgResult = $ReadOnly<{
rows: { [string]: any }[],
...
}>

type PgListener = (channel: string, payload: any) => any

interface PgIpc {
notify(channel: string, payload?: any): Promise<void>;
listen(channel: string, listener: PgListener): Promise<void>;
unlisten(channel: string, listener: PgListener): Promise<void>;
}

interface PgPool {
query(sql: string, params?: any[]): Promise<PgResult>;
}

export type ShardRegistrarOptions = $ReadOnly<{
cluster: string,
database: $ReadOnly<{
database: string,
user: string,
password: string,
host: string,
port: number,
native?: boolean,
}>,
pool: PgPool,
ipc: PgIpc,
heartbeatInterval: number,
gracePeriod: number,
reshardInterval: number,
Expand All @@ -37,20 +46,20 @@ export default class ShardRegistrar extends EventEmitter<ShardRegistrarEvents> {
_options: ShardRegistrarOptions
_heartbeatTimeout: ?TimeoutID
_holder: string = uuidv4()
_client: Client
_pool: PgPool
_ipc: PgIpc
_shard: ?number
_numShards: ?number
_running: boolean = false
_debug: any = debug(`ShardRegistrar:${this._holder.substring(0, 8)}`)
_upsertedCluster: boolean = false
_lastQuery: ?Promise<any>
_lastQuery: ?Promise<PgResult>

constructor(options: ShardRegistrarOptions) {
super()
this._options = options
this._client = new (
this._options.database.native ? pg.native.Client : pg.Client
)({ ...options.database })
this._pool = options.pool
this._ipc = options.ipc
}

shardInfo(): { shard: number, numShards: number } {
Expand All @@ -66,29 +75,20 @@ export default class ShardRegistrar extends EventEmitter<ShardRegistrarEvents> {
if (this._running) return
this._running = true
this._upsertedCluster = false
const { _holder: holder } = this
await this._client.connect()
this._client.on('notification', this._onNotification)
this._client.on('error', this._onError)
await this._query(`LISTEN "shardInfo/${holder}"`)
this._ipc.listen(`shardInfo/${this._holder}`, this._onNotification)
this._onHeartbeat()
}

async stop(): Promise<void> {
if (!this._running) return
this._running = false
if (this._heartbeatTimeout != null) clearTimeout(this._heartbeatTimeout)
this._client.removeListener('notification', this._onNotification)
this._ipc.unlisten(`shardInfo/${this._holder}`, this._onNotification)
try {
await this._lastQuery
} catch (error) {
// ignore
}
await this._client.end()
this._client.removeListener('error', this._onError)
this._client = new (
this._options.database.native ? pg.native.Client : pg.Client
)({ ...this._options.database })
}

_onError: (err: Error) => any = (err: Error) => this.emit('error', err)
Expand All @@ -101,29 +101,20 @@ export default class ShardRegistrar extends EventEmitter<ShardRegistrarEvents> {
}
}

_onNotification: ({
channel: string,
payload?: string,
...
}) => any = ({
channel,
payload,
}: {
_onNotification: (channel: string, payload: any) => any = (
channel: string,
payload?: string,
...
}) => {
payload: any
) => {
this._debug(channel, payload)
const obj = payload ? JSON.parse(payload) : null
try {
if (!obj) {
if (!payload) {
throw new Error(
`received invalid payload from Postgres channel "${channel}": ${String(
payload
)}`
)
}
const { shard, numShards } = obj
const { shard, numShards } = payload
if (typeof shard !== 'number' || typeof numShards !== 'number') {
throw new Error(
`received invalid payload from Postgres channel "${channel}": ${String(
Expand All @@ -147,10 +138,10 @@ export default class ShardRegistrar extends EventEmitter<ShardRegistrarEvents> {
}
}

async _query(sql: string, params?: Array<any>): Promise<ResultSet> {
async _query(sql: string, params?: Array<any>): Promise<PgResult> {
this._debug(sql, params)
if (!this._running) throw new Error('already stopped')
const result = await (this._lastQuery = this._client.query(sql, params))
const result = await (this._lastQuery = this._pool.query(sql, params))
this._debug(result.rows)
return result
}
Expand Down
9 changes: 7 additions & 2 deletions test/database.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// @flow

import requireEnv from '@jcoreio/require-env'
import { type ShardRegistrarOptions } from '../src'

export const database: $PropertyType<ShardRegistrarOptions, 'database'> = {
export const database: {|
user: string,
host: string,
database: string,
password: string,
port: number,
|} = {
user: 'postgres',
host: 'localhost',
database: 'postgres',
Expand Down
56 changes: 26 additions & 30 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,25 @@ import {
umzugMigrationOptions,
} from '../src'
import { database } from './database'
import { describe, it, afterEach, before, beforeEach } from 'mocha'
import { describe, it, after, afterEach, before, beforeEach } from 'mocha'
import { expect } from 'chai'
import emitted from 'p-event'
import delay from 'delay'
import { range } from 'lodash'
import { Client } from 'pg'
import { Client, Pool } from 'pg'
import PgIpc from '@jcoreio/pg-ipc'
import Umzug from 'umzug'
import UmzugPostgresStorage from './util/UmzugPostgresStorage'
import poll from '@jcoreio/poll'

async function prepareTestDatabase(): Promise<void> {
let client
await poll(async (): Promise<void> => {
client = new Client({ ...database, database: 'postgres' })
const client = new Client({ ...database, database: 'postgres' })
await client.connect()
await client.end()
}, 1000).timeout(15000)
// try {
// const {
// rows: [{ database_exists }],
// } = await client.query({
// text: `SELECT EXISTS (SELECT FROM pg_database WHERE datname = $1) AS database_exists`,
// values: [database.database],
// })
// if (!database_exists) {
// await client.query(`CREATE DATABASE ${database.database};`)
// }
// } finally {
// await client.end()
// }

client = new Client({ ...database })

const client = new Client({ ...database })
await client.connect()
try {
await client.query(`DROP SCHEMA IF EXISTS public CASCADE;`)
Expand All @@ -46,6 +33,16 @@ async function prepareTestDatabase(): Promise<void> {
}
}

const pool = new Pool({ ...database })
// eslint-disable-next-line no-console
pool.on('error', (error) => console.error(error.stack))

const ipc = new PgIpc({
newClient: () => new Client({ ...database }),
})
// eslint-disable-next-line no-console
ipc.on('error', (error) => console.error(error.stack))

before(async function (): Promise<void> {
this.timeout(30000)

Expand All @@ -54,8 +51,6 @@ before(async function (): Promise<void> {

beforeEach(async function (): Promise<void> {
this.timeout(30000)
const client = new Client({ ...database })
await client.connect()

const umzug = new Umzug({
storage: new UmzugPostgresStorage({ database }),
Expand All @@ -66,17 +61,15 @@ beforeEach(async function (): Promise<void> {
},
migrations: {
...umzugMigrationOptions(),
params: [{ query: (sql: string) => client.query(sql) }],
params: [{ query: (...args) => pool.query(...args) }],
},
})

try {
await umzug.up()
} finally {
await client.end()
}
await umzug.up()
})

after(() => Promise.all([pool.end(), ipc.end()]))

describe('ShardRegistrar', function () {
this.timeout(30000)
let registrars = []
Expand All @@ -100,7 +93,8 @@ describe('ShardRegistrar', function () {
const gracePeriod = 3
const reshardInterval = 5
const options = {
database,
pool,
ipc,
cluster,
heartbeatInterval,
gracePeriod,
Expand Down Expand Up @@ -183,7 +177,8 @@ describe('ShardRegistrar', function () {
const numShards = 10
const clusterA = range(numShards).map(() =>
createRegistrar({
database,
pool,
ipc,
cluster: 'a',
heartbeatInterval,
gracePeriod,
Expand All @@ -192,7 +187,8 @@ describe('ShardRegistrar', function () {
)
const clusterB = range(numShards).map(() =>
createRegistrar({
database,
pool,
ipc,
cluster: 'b',
heartbeatInterval,
gracePeriod,
Expand Down
Loading

0 comments on commit 5d6aa4a

Please sign in to comment.