Skip to content

Commit

Permalink
fix: get everything working and add basic docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jedwards1211 committed Jan 7, 2019
1 parent ce6aef1 commit 3aba305
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 351 deletions.
113 changes: 89 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,97 @@
[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
[![npm version](https://badge.fury.io/js/%40jcoreio%2Fpostgres-shard-coordinator.svg)](https://badge.fury.io/js/%40jcoreio%2Fpostgres-shard-coordinator)

This is my personal skeleton for creating an ES2015 library npm package. You are welcome to use it.
Helps processes pick a unique shard index and determine the number of shards,
using Postgres to coordinate registration.

## Quick start
# Introduction

```sh
npm i -g howardroark/pollinate
pollinate https://github.com/jcoreio/postgres-shard-coordinator.git --keep-history --name <package name> --author <your name> --organization <github organization> --description <package description>
cd <package name>
npm i
This is designed for any situation where batch processing needs to be divided
between multiple processes using hash-based sharding. For example, Clarity uses
multiple processes to handle the notification queue; each process restricts
itself to events where

```
knuth_hash(userId) % numShards >= (shard * MAX_USER_ID) / numShards &&
knuth_hash(userId) % numShards < ((shard + 1) * MAX_USER_ID) / numShards
```

Each of these processes can use `@jcoreio/postgres-shard-coordinator` to pick a unique
`shard` index and determine the total `numShards` (number of processes) in a
decentralized fashion that automatically adapts as processes are spawned or die.

# Usage

```
npm i --save @jcoreio/postgres-shard-coordinator
```

## Tools used

* babel 6
* babel-preset-env
* mocha
* chai
* istanbul
* nyc
* babel-plugin-istanbul
* eslint
* eslint-watch
* flow
* flow-watch
* pre-commit (runs eslnt and flow)
* semantic-release
* Travis CI
* Coveralls
## Database migration

You will need to perform provided migrations to create the tables and functions
for coordination:

```js
import { Client } from 'pg'
import Umzug from 'umzug'
import { umzugMigrationOptions } from '@jcoreio/postgres-shard-coordinator'

export default async function migrate({ database }) {
const umzug = new Umzug({
storage: 'umzug-postgres-storage',
storageOptions: {
database,
relation: '"SequelizeMeta"',
column: 'name',
},
migrations: {
...umzugMigrationOptions(),
params: [{ query }],
},
})

await umzug.up()
}

async function query(sql) {
const client = new Client(database)
try {
await client.connect()
migrationDebug(sql)
return await client.query(sql)
} finally {
await client.end()
}
}
```

## Shard registration

```js
import { ShardRegistrar } from '@jcoreio/postgres-shard-coordinator'
import requireEnv from '@jcoreio/require-env'
import migrate from './migrate'

const database = {
user: requireEnv('DB_USER'),
host: requireEnv('DB_HOST'),
database: requireEnv('DB_NAME'),
password: requireEnv('DB_PASSWORD'),
port: parseInt(requireEnv('DB_PORT')),
}

const registrar = new ShardRegistrar({
database,
cluster: 'clarity_notifications',
heartbeatInterval: 60, // seconds
gracePeriod: 30, // seconds
reshardInterval: 60, // seconds
})

registrar.on('shardChanged', ({ shard, numShards }) => {
// reconfigure the notification queue processor
})
registrar.on('error', err => console.error(err.stack))

migrate({ database }).then(() => registrar.start())
```
9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
]
},
"config": {
"mocha": "-r @babel/register ./test/configure.js ./test/**/*.js",
"mocha": "-r @babel/register ./test/configure.js ./test/*.js",
"commitizen": {
"path": "cz-conventional-changelog"
}
Expand Down Expand Up @@ -106,23 +106,22 @@
"husky": "^1.1.4",
"istanbul": "^0.4.5",
"lint-staged": "^8.0.4",
"lodash": "^4.17.11",
"mocha": "^5.2.0",
"nyc": "^13.1.0",
"p-event": "^2.1.0",
"prettier": "^1.15.2",
"prettier-eslint": "^8.8.2",
"rimraf": "^2.6.0",
"semantic-release": "^15.1.4",
"travis-deploy-once": "^5.0.9"
"travis-deploy-once": "^5.0.9",
"umzug": "^2.2.0"
},
"dependencies": {
"@babel/runtime": "^7.1.5",
"@jcoreio/typed-event-emitter": "^1.0.0",
"debug": "^4.1.1",
"lodash": "^4.17.11",
"log4jcore": "^2.0.3",
"pg": "^7.7.1",
"umzug": "^2.2.0",
"uuid": "^3.3.2"
}
}
229 changes: 229 additions & 0 deletions src/ShardRegistrar.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/**
* @flow
* @prettier
*/
import EventEmitter from '@jcoreio/typed-event-emitter'
import { Client, type Result } from 'pg'
import uuidv4 from 'uuid/v4'
import debug from 'debug'
import ShardReservationCluster from './schema/ShardReservationCluster'
import ShardReservation from './schema/ShardReservation'

const RESHARD_DEBUG = debug.enabled('ShardRegistrar:reshard')

export type ShardRegistrarEvents = {
shardChanged: [{ shard: number, numShards: number }],
error: [Error],
}

export type ShardRegistrarOptions = {
cluster: string,
database: {
database: string,
user: string,
password: string,
host: string,
port: number,
},
heartbeatInterval: number,
gracePeriod: number,
reshardInterval: number,
}

export default class ShardRegistrar extends EventEmitter<ShardRegistrarEvents> {
_options: ShardRegistrarOptions
_heartbeatTimeout: ?TimeoutID
_holder: string = uuidv4()
_client: Client
_shard: ?number
_numShards: ?number
_running: boolean = false
_debug = debug(`ShardRegistrar:${this._holder.substring(0, 8)}`)
_upsertedCluster: boolean = false
_lastQuery: ?Promise<any>

constructor(options: ShardRegistrarOptions) {
super()
this._options = options
this._client = new Client(options.database)
}

shardInfo(): { shard: number, numShards: number } {
const shard = this._shard
const numShards = this._numShards
if (shard == null || numShards == null) {
throw new Error('no shard has been reserved')
}
return { shard, numShards }
}

async start(): Promise<void> {
if (this._running) return
this._running = true
this._upsertedCluster = false
const { _holder: holder } = this
await this._client.connect()
this._client.on('notification', this._onNotification)
this._client.on('error', this._onError)
await this._query(`LISTEN "shardInfo/${holder}"`)
this._onHeartbeat()
}

async stop(): Promise<void> {
if (!this._running) return
this._running = false
if (this._heartbeatTimeout != null) clearTimeout(this._heartbeatTimeout)
this._client.removeListener('notification', this._onNotification)
try {
await this._lastQuery
} catch (error) {
// ignore
}
await this._client.end()
this._client.removeListener('error', this._onError)
this._client = new Client(this._options.database)
}

_onError = (err: Error) => this.emit('error', err)

_setShard({ shard, numShards }: { shard: number, numShards: number }) {
if (shard !== this._shard || numShards !== this._numShards) {
this._shard = shard
this._numShards = numShards
this.emit('shardChanged', { shard, numShards })
}
}

_onNotification = ({
channel,
payload,
}: {
channel: string,
payload: string,
}) => {
this._debug(channel, payload)
const obj = JSON.parse(payload)
try {
if (!obj) {
throw new Error(
`received invalid payload from Postgres channel "${channel}": ${payload}`
)
}
const { shard, numShards } = obj
if (typeof shard !== 'number' || typeof numShards !== 'number') {
throw new Error(
`received invalid payload from Postgres channel "${channel}": ${payload}`
)
}
this._setShard({ shard, numShards })
} catch (error) {
this.emit('error', error)
}
}

_onHeartbeat = async (): Promise<void> => {
let nextTime = Date.now() + this._options.heartbeatInterval * 1000
const reshardAt: ?Date = await this._register()
if (reshardAt) nextTime = Math.min(nextTime, reshardAt.getTime())
const delay = Math.max(0, nextTime - Date.now())
if (this._running) {
this._heartbeatTimeout = setTimeout(this._onHeartbeat, delay)
}
}

async _query(sql: string, params?: Array<any>): Promise<Result> {
this._debug(sql, params)
if (!this._running) throw new Error('already stopped')
const result = await (this._lastQuery = this._client.query(sql, params))
this._debug(result.rows)
return result
}

async _register(): Promise<?Date> {
const { _holder: holder } = this
const { cluster, heartbeatInterval, gracePeriod } = this._options
const interval = `${heartbeatInterval + gracePeriod} seconds`
const reshardInterval = `${this._options.reshardInterval} seconds`

try {
if (!this._upsertedCluster) {
await this._query(upsertClusterQuery, [cluster])
this._upsertedCluster = true
}
await this._query(registerQuery, [cluster, holder, interval])
const {
rows: [{ isCoordinator }],
} = await this._query(selectIsCoordinatorQuery, [holder, cluster])
let reshardAt: ?Date
if (isCoordinator) {
;({
rows: [{ reshardAt }],
} = await this._query(
`SELECT "reshard_ShardReservations"($1, $2::interval) AS "reshardAt";`,
[cluster, reshardInterval]
))
if (RESHARD_DEBUG) {
const { rows } = await this._query(
`SELECT * FROM ${ShardReservation.tableName} WHERE ${
ShardReservation.cluster
} = $1 ORDER BY ${ShardReservation.shard} NULLS LAST, ${
ShardReservation.holder
}`,
[cluster]
)
console.table(rows) // eslint-disable-line no-console
}
}

return reshardAt
} catch (error) {
if (this._running) this.emit('error', error)
}
}
}

const upsertClusterQuery = `
INSERT INTO ${ShardReservationCluster.tableName} (
${ShardReservationCluster.cluster}
)
VALUES ($1)
ON CONFLICT (${ShardReservationCluster.cluster}) DO NOTHING;
`
.trim()
.replace(/\s+/g, ' ')

const registerQuery = `
INSERT INTO ${ShardReservation.tableName} (
${ShardReservation.cluster},
${ShardReservation.holder},
${ShardReservation.expiresAt}
)
VALUES (
$1,
$2,
CURRENT_TIMESTAMP + $3::interval
)
ON CONFLICT (${ShardReservation.holder}) DO UPDATE
SET ${ShardReservation.expiresAt} = CURRENT_TIMESTAMP + $3::interval,
${ShardReservation.shard} = CASE
WHEN ${ShardReservation.tableName}.${ShardReservation.expiresAt}
<= CURRENT_TIMESTAMP
THEN NULL
ELSE ${ShardReservation.tableName}.${ShardReservation.shard}
END
RETURNING ${ShardReservation.tableName};
`
.trim()
.replace(/\s+/g, ' ')

const selectIsCoordinatorQuery = `
SELECT $1 = (
SELECT ${ShardReservation.holder} FROM ${ShardReservation.tableName}
WHERE ${ShardReservation.cluster} = $2
AND ${ShardReservation.expiresAt} > CURRENT_TIMESTAMP
ORDER BY ${ShardReservation.shard} NULLS LAST, ${ShardReservation.holder}
LIMIT 1
) AS "isCoordinator";
`
.trim()
.replace(/\s+/g, ' ')
Loading

0 comments on commit 3aba305

Please sign in to comment.