From 3aba3058cf16fc285784d3f1b97e7d52518a2f4a Mon Sep 17 00:00:00 2001 From: Andy Edwards Date: Mon, 7 Jan 2019 00:06:54 -0600 Subject: [PATCH] fix: get everything working and add basic docs --- README.md | 113 +++++++-- package.json | 9 +- src/ShardRegistrar.js | 229 +++++++++++++++++ src/index.js | 238 +----------------- ...0190104204203-create-ShardReservations.sql | 47 +++- src/umzug.js | 65 ----- src/umzugMigrationOptions.js | 41 +++ test/configure.js | 33 ++- test/index.js | 13 +- {src => test}/util/UmzugPostgresStorage.js | 0 yarn.lock | 14 -- 11 files changed, 451 insertions(+), 351 deletions(-) create mode 100644 src/ShardRegistrar.js delete mode 100644 src/umzug.js create mode 100644 src/umzugMigrationOptions.js rename {src => test}/util/UmzugPostgresStorage.js (100%) diff --git a/README.md b/README.md index 0167e7f..f496876 100644 --- a/README.md +++ b/README.md @@ -6,32 +6,97 @@ [![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/) [![npm version](https://badge.fury.io/js/%40jcoreio%2Fpostgres-shard-coordinator.svg)](https://badge.fury.io/js/%40jcoreio%2Fpostgres-shard-coordinator) -This is my personal skeleton for creating an ES2015 library npm package. You are welcome to use it. +Helps processes pick a unique shard index and determine the number of shards, +using Postgres to coordinate registration. -## Quick start +# Introduction -```sh -npm i -g howardroark/pollinate -pollinate https://github.com/jcoreio/postgres-shard-coordinator.git --keep-history --name --author --organization --description -cd -npm i +This is designed for any situation where batch processing needs to be divided +between multiple processes using hash-based sharding. For example, Clarity uses +multiple processes to handle the notification queue; each process restricts +itself to events where + +``` + knuth_hash(userId) % numShards >= (shard * MAX_USER_ID) / numShards && + knuth_hash(userId) % numShards < ((shard + 1) * MAX_USER_ID) / numShards +``` + +Each of these processes can use `@jcoreio/postgres-shard-coordinator` to pick a unique +`shard` index and determine the total `numShards` (number of processes) in a +decentralized fashion that automatically adapts as processes are spawned or die. + +# Usage + +``` +npm i --save @jcoreio/postgres-shard-coordinator ``` -## Tools used - -* babel 6 -* babel-preset-env -* mocha -* chai -* istanbul -* nyc -* babel-plugin-istanbul -* eslint -* eslint-watch -* flow -* flow-watch -* pre-commit (runs eslnt and flow) -* semantic-release -* Travis CI -* Coveralls +## Database migration + +You will need to perform provided migrations to create the tables and functions +for coordination: + +```js +import { Client } from 'pg' +import Umzug from 'umzug' +import { umzugMigrationOptions } from '@jcoreio/postgres-shard-coordinator' + +export default async function migrate({ database }) { + const umzug = new Umzug({ + storage: 'umzug-postgres-storage', + storageOptions: { + database, + relation: '"SequelizeMeta"', + column: 'name', + }, + migrations: { + ...umzugMigrationOptions(), + params: [{ query }], + }, + }) + await umzug.up() +} + +async function query(sql) { + const client = new Client(database) + try { + await client.connect() + migrationDebug(sql) + return await client.query(sql) + } finally { + await client.end() + } +} +``` + +## Shard registration + +```js +import { ShardRegistrar } from '@jcoreio/postgres-shard-coordinator' +import requireEnv from '@jcoreio/require-env' +import migrate from './migrate' + +const database = { + user: requireEnv('DB_USER'), + host: requireEnv('DB_HOST'), + database: requireEnv('DB_NAME'), + password: requireEnv('DB_PASSWORD'), + port: parseInt(requireEnv('DB_PORT')), +} + +const registrar = new ShardRegistrar({ + database, + cluster: 'clarity_notifications', + heartbeatInterval: 60, // seconds + gracePeriod: 30, // seconds + reshardInterval: 60, // seconds +}) + +registrar.on('shardChanged', ({ shard, numShards }) => { + // reconfigure the notification queue processor +}) +registrar.on('error', err => console.error(err.stack)) + +migrate({ database }).then(() => registrar.start()) +``` diff --git a/package.json b/package.json index 7123dd8..5652f1d 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ ] }, "config": { - "mocha": "-r @babel/register ./test/configure.js ./test/**/*.js", + "mocha": "-r @babel/register ./test/configure.js ./test/*.js", "commitizen": { "path": "cz-conventional-changelog" } @@ -106,6 +106,7 @@ "husky": "^1.1.4", "istanbul": "^0.4.5", "lint-staged": "^8.0.4", + "lodash": "^4.17.11", "mocha": "^5.2.0", "nyc": "^13.1.0", "p-event": "^2.1.0", @@ -113,16 +114,14 @@ "prettier-eslint": "^8.8.2", "rimraf": "^2.6.0", "semantic-release": "^15.1.4", - "travis-deploy-once": "^5.0.9" + "travis-deploy-once": "^5.0.9", + "umzug": "^2.2.0" }, "dependencies": { "@babel/runtime": "^7.1.5", "@jcoreio/typed-event-emitter": "^1.0.0", "debug": "^4.1.1", - "lodash": "^4.17.11", - "log4jcore": "^2.0.3", "pg": "^7.7.1", - "umzug": "^2.2.0", "uuid": "^3.3.2" } } diff --git a/src/ShardRegistrar.js b/src/ShardRegistrar.js new file mode 100644 index 0000000..53affae --- /dev/null +++ b/src/ShardRegistrar.js @@ -0,0 +1,229 @@ +/** + * @flow + * @prettier + */ +import EventEmitter from '@jcoreio/typed-event-emitter' +import { Client, type Result } from 'pg' +import uuidv4 from 'uuid/v4' +import debug from 'debug' +import ShardReservationCluster from './schema/ShardReservationCluster' +import ShardReservation from './schema/ShardReservation' + +const RESHARD_DEBUG = debug.enabled('ShardRegistrar:reshard') + +export type ShardRegistrarEvents = { + shardChanged: [{ shard: number, numShards: number }], + error: [Error], +} + +export type ShardRegistrarOptions = { + cluster: string, + database: { + database: string, + user: string, + password: string, + host: string, + port: number, + }, + heartbeatInterval: number, + gracePeriod: number, + reshardInterval: number, +} + +export default class ShardRegistrar extends EventEmitter { + _options: ShardRegistrarOptions + _heartbeatTimeout: ?TimeoutID + _holder: string = uuidv4() + _client: Client + _shard: ?number + _numShards: ?number + _running: boolean = false + _debug = debug(`ShardRegistrar:${this._holder.substring(0, 8)}`) + _upsertedCluster: boolean = false + _lastQuery: ?Promise + + constructor(options: ShardRegistrarOptions) { + super() + this._options = options + this._client = new Client(options.database) + } + + shardInfo(): { shard: number, numShards: number } { + const shard = this._shard + const numShards = this._numShards + if (shard == null || numShards == null) { + throw new Error('no shard has been reserved') + } + return { shard, numShards } + } + + async start(): Promise { + if (this._running) return + this._running = true + this._upsertedCluster = false + const { _holder: holder } = this + await this._client.connect() + this._client.on('notification', this._onNotification) + this._client.on('error', this._onError) + await this._query(`LISTEN "shardInfo/${holder}"`) + this._onHeartbeat() + } + + async stop(): Promise { + if (!this._running) return + this._running = false + if (this._heartbeatTimeout != null) clearTimeout(this._heartbeatTimeout) + this._client.removeListener('notification', this._onNotification) + try { + await this._lastQuery + } catch (error) { + // ignore + } + await this._client.end() + this._client.removeListener('error', this._onError) + this._client = new Client(this._options.database) + } + + _onError = (err: Error) => this.emit('error', err) + + _setShard({ shard, numShards }: { shard: number, numShards: number }) { + if (shard !== this._shard || numShards !== this._numShards) { + this._shard = shard + this._numShards = numShards + this.emit('shardChanged', { shard, numShards }) + } + } + + _onNotification = ({ + channel, + payload, + }: { + channel: string, + payload: string, + }) => { + this._debug(channel, payload) + const obj = JSON.parse(payload) + try { + if (!obj) { + throw new Error( + `received invalid payload from Postgres channel "${channel}": ${payload}` + ) + } + const { shard, numShards } = obj + if (typeof shard !== 'number' || typeof numShards !== 'number') { + throw new Error( + `received invalid payload from Postgres channel "${channel}": ${payload}` + ) + } + this._setShard({ shard, numShards }) + } catch (error) { + this.emit('error', error) + } + } + + _onHeartbeat = async (): Promise => { + let nextTime = Date.now() + this._options.heartbeatInterval * 1000 + const reshardAt: ?Date = await this._register() + if (reshardAt) nextTime = Math.min(nextTime, reshardAt.getTime()) + const delay = Math.max(0, nextTime - Date.now()) + if (this._running) { + this._heartbeatTimeout = setTimeout(this._onHeartbeat, delay) + } + } + + async _query(sql: string, params?: Array): Promise { + this._debug(sql, params) + if (!this._running) throw new Error('already stopped') + const result = await (this._lastQuery = this._client.query(sql, params)) + this._debug(result.rows) + return result + } + + async _register(): Promise { + const { _holder: holder } = this + const { cluster, heartbeatInterval, gracePeriod } = this._options + const interval = `${heartbeatInterval + gracePeriod} seconds` + const reshardInterval = `${this._options.reshardInterval} seconds` + + try { + if (!this._upsertedCluster) { + await this._query(upsertClusterQuery, [cluster]) + this._upsertedCluster = true + } + await this._query(registerQuery, [cluster, holder, interval]) + const { + rows: [{ isCoordinator }], + } = await this._query(selectIsCoordinatorQuery, [holder, cluster]) + let reshardAt: ?Date + if (isCoordinator) { + ;({ + rows: [{ reshardAt }], + } = await this._query( + `SELECT "reshard_ShardReservations"($1, $2::interval) AS "reshardAt";`, + [cluster, reshardInterval] + )) + if (RESHARD_DEBUG) { + const { rows } = await this._query( + `SELECT * FROM ${ShardReservation.tableName} WHERE ${ + ShardReservation.cluster + } = $1 ORDER BY ${ShardReservation.shard} NULLS LAST, ${ + ShardReservation.holder + }`, + [cluster] + ) + console.table(rows) // eslint-disable-line no-console + } + } + + return reshardAt + } catch (error) { + if (this._running) this.emit('error', error) + } + } +} + +const upsertClusterQuery = ` +INSERT INTO ${ShardReservationCluster.tableName} ( + ${ShardReservationCluster.cluster} + ) + VALUES ($1) + ON CONFLICT (${ShardReservationCluster.cluster}) DO NOTHING; +` + .trim() + .replace(/\s+/g, ' ') + +const registerQuery = ` +INSERT INTO ${ShardReservation.tableName} ( + ${ShardReservation.cluster}, + ${ShardReservation.holder}, + ${ShardReservation.expiresAt} + ) + VALUES ( + $1, + $2, + CURRENT_TIMESTAMP + $3::interval + ) + ON CONFLICT (${ShardReservation.holder}) DO UPDATE + SET ${ShardReservation.expiresAt} = CURRENT_TIMESTAMP + $3::interval, + ${ShardReservation.shard} = CASE + WHEN ${ShardReservation.tableName}.${ShardReservation.expiresAt} + <= CURRENT_TIMESTAMP + THEN NULL + ELSE ${ShardReservation.tableName}.${ShardReservation.shard} + END + RETURNING ${ShardReservation.tableName}; +` + .trim() + .replace(/\s+/g, ' ') + +const selectIsCoordinatorQuery = ` +SELECT $1 = ( + SELECT ${ShardReservation.holder} FROM ${ShardReservation.tableName} + WHERE ${ShardReservation.cluster} = $2 + AND ${ShardReservation.expiresAt} > CURRENT_TIMESTAMP + ORDER BY ${ShardReservation.shard} NULLS LAST, ${ShardReservation.holder} + LIMIT 1 + ) AS "isCoordinator"; +` + .trim() + .replace(/\s+/g, ' ') diff --git a/src/index.js b/src/index.js index 9c0239d..b1f20dc 100644 --- a/src/index.js +++ b/src/index.js @@ -2,236 +2,12 @@ * @flow * @prettier */ -import EventEmitter from '@jcoreio/typed-event-emitter' -import { Client, type Result } from 'pg' -import uuidv4 from 'uuid/v4' -import debug from 'debug' -import { throttle } from 'lodash' -import ShardReservationCluster from './schema/ShardReservationCluster' +export { default as ShardRegistrar } from './ShardRegistrar' +export type { + ShardRegistrarOptions, + ShardRegistrarEvents, +} from './ShardRegistrar' -import ShardReservation from './schema/ShardReservation' - -type Events = { - shardChanged: [{ shard: number, numShards: number }], -} - -export type ShardRegistrarOptions = { - cluster: string, - database: { - database: string, - user: string, - password: string, - host: string, - port: number, - }, - heartbeatInterval: number, - gracePeriod: number, -} - -export class ShardRegistrar extends EventEmitter { - _options: ShardRegistrarOptions - _heartbeatTimeout: ?TimeoutID - _holder: string = uuidv4() - _client: Client - _shard: ?number - _numShards: ?number - _running: boolean = false - _debug = debug(this._holder.substring(0, 8)) - _upsertedCluster: boolean = false - _lastQuery: ?Promise - - constructor(options: ShardRegistrarOptions) { - super() - this._options = options - this._client = new Client(options.database) - } - - shardInfo(): { shard: number, numShards: number } { - const shard = this._shard - const numShards = this._numShards - if (shard == null || numShards == null) { - throw new Error('no shard has been reserved') - } - return { shard, numShards } - } - - async start(): Promise { - if (this._running) return - this._running = true - this._upsertedCluster = false - const { _holder: holder } = this - await this._client.connect() - this._client.on('notification', this._onNotification) - this._client.on('error', this._onError) - await this._query(`LISTEN "shardInfo/${holder}"`) - this._onHeartbeat() - } - - async stop(): Promise { - if (!this._running) return - this._running = false - if (this._heartbeatTimeout != null) clearTimeout(this._heartbeatTimeout) - this._client.removeListener('notification', this._onNotification) - try { - await this._lastQuery - } catch (error) { - // ignore - } - await this._client.end() - this._client.removeListener('error', this._onError) - this._client = new Client(this._options.database) - } - - _onError = (err: Error) => console.error(err.stack) // eslint-disable-line no-console - - _setShard({ shard, numShards }: { shard: number, numShards: number }) { - if (shard !== this._shard || numShards !== this._numShards) { - this._shard = shard - this._numShards = numShards - this.emit('shardChanged', { shard, numShards }) - } - } - - _onNotification = throttle( - ({ channel, payload }: { channel: string, payload: string }) => { - this._debug(channel, payload) - const obj = JSON.parse(payload) - // istanbul ignore next - if (!obj) return - const { shard, numShards } = obj - // istanbul ignore next - if (typeof shard !== 'number' || typeof numShards !== 'number') return - this._setShard({ shard, numShards }) - }, - 1000 - ) - - _onHeartbeat = async (): Promise => { - let startTime = Date.now() - let delay - this._register().catch((err: Error) => { - if (this._running) { - console.error(err.stack) // eslint-disable-line no-console - } - delay = 100 - }) - delay = Math.max( - 0, - startTime + this._options.heartbeatInterval * 1000 - Date.now() - ) - if (this._running) { - this._heartbeatTimeout = setTimeout(this._onHeartbeat, delay) - } - } - - async _query(sql: string, params?: Array): Promise { - this._debug(sql, params) - if (!this._running) throw new Error('already stopped') - const result = await (this._lastQuery = this._client.query(sql, params)) - this._debug(result.rows) - return result - } - - async _register(): Promise { - const { _holder: holder } = this - const { cluster, heartbeatInterval, gracePeriod } = this._options - const interval = `${heartbeatInterval + gracePeriod} seconds` - - try { - if (!this._upsertedCluster) { - await this._query(upsertClusterQuery, [cluster]) - this._upsertedCluster = true - } - - await this._query('BEGIN;') - let result - - await this._query(lockQuery, [cluster]) - - result = await this._query(upsertQuery, [cluster, holder, interval]) - - let reshard = result.rows[0].shard == null - - const { - rows: [{ isCoordinator }], - } = await this._query(selectIsCoordinatorQuery, [holder, cluster]) - - if (isCoordinator) { - result = await this._query(deleteExpiredQuery, [cluster]) - if (result.rowCount) reshard = true - } - if (reshard) { - await this._query(`SELECT "reshard_ShardReservations"($1);`, [cluster]) - } - - await this._query('COMMIT;') - } catch (error) { - await this._query('ROLLBACK;').catch(() => {}) - throw error - } - } -} - -const upsertClusterQuery = ` -INSERT INTO ${ShardReservationCluster.tableName} ( - ${ShardReservationCluster.cluster} - ) - VALUES ($1) - ON CONFLICT (${ShardReservationCluster.cluster}) DO NOTHING; -` - .trim() - .replace(/\s+/g, ' ') - -const lockQuery = ` -SELECT 1 - FROM ${ShardReservation.tableName} - WHERE ${ShardReservation.cluster} = $1 - FOR UPDATE; -` - .trim() - .replace(/\s+/g, ' ') - -const upsertQuery = ` -INSERT INTO ${ShardReservation.tableName} ( - ${ShardReservation.cluster}, - ${ShardReservation.holder}, - ${ShardReservation.expiresAt} - ) - VALUES ( - $1, - $2, - CURRENT_TIMESTAMP + $3::interval - ) - ON CONFLICT (${ShardReservation.holder}) DO UPDATE - SET ${ShardReservation.expiresAt} = CURRENT_TIMESTAMP + $3::interval, - ${ShardReservation.shard} = CASE - WHEN ${ShardReservation.tableName}.${ShardReservation.expiresAt} - <= CURRENT_TIMESTAMP - THEN NULL - ELSE ${ShardReservation.tableName}.${ShardReservation.shard} - END - RETURNING ${ShardReservation.tableName}; -` - .trim() - .replace(/\s+/g, ' ') - -const selectIsCoordinatorQuery = ` -SELECT $1 = ( - SELECT ${ShardReservation.holder} FROM ${ShardReservation.tableName} - WHERE ${ShardReservation.cluster} = $2 - AND ${ShardReservation.expiresAt} > CURRENT_TIMESTAMP - ORDER BY ${ShardReservation.shard} - LIMIT 1 - ) AS "isCoordinator"; -` - .trim() - .replace(/\s+/g, ' ') - -const deleteExpiredQuery = ` -DELETE FROM ${ShardReservation.tableName} - WHERE ${ShardReservation.cluster} = $1 - AND ${ShardReservation.expiresAt} <= CURRENT_TIMESTAMP; -` - .trim() - .replace(/\s+/g, ' ') +export { default as umzugMigrationOptions } from './umzugMigrationOptions' +export type { Migration } from './umzugMigrationOptions' diff --git a/src/migrations/20190104204203-create-ShardReservations.sql b/src/migrations/20190104204203-create-ShardReservations.sql index 3690d8b..5d632b8 100644 --- a/src/migrations/20190104204203-create-ShardReservations.sql +++ b/src/migrations/20190104204203-create-ShardReservations.sql @@ -1,5 +1,6 @@ CREATE TABLE "ShardReservationClusters" ( "cluster" character varying(32) not null, + "reshardedAt" timestamp with time zone, PRIMARY KEY ("cluster") ); @@ -20,17 +21,48 @@ CREATE INDEX "ShardReservations_cluster_expiresAt_idx" CREATE INDEX "ShardReservations_cluster_shard_holder_idx" ON "ShardReservations" ("cluster", shard, holder); -CREATE FUNCTION "reshard_ShardReservations"(target_cluster text) -RETURNS VOID AS $$ +CREATE FUNCTION "reshard_ShardReservations"( + target_cluster text, + reshard_interval interval +) +RETURNS timestamp with time zone AS $$ DECLARE r RECORD; + reshardAt timestamp with time zone; + now timestamp with time zone := CURRENT_TIMESTAMP; BEGIN + reshardAt := ( + SELECT "reshardedAt" + reshard_interval + FROM "ShardReservationClusters" + WHERE "cluster" = target_cluster + -- aquire an exclusive lock on the cluster + FOR UPDATE + ); + + IF reshardAt IS NOT NULL AND reshardAt > now + INTERVAL '1 seconds' THEN + RETURN reshardAt; + END IF; + + IF NOT EXISTS (SELECT 1 FROM "ShardReservations" + WHERE "cluster" = target_cluster + AND ( + "expiresAt" <= now + OR "shard" IS NULL + ) + ) THEN + RETURN NULL; + END IF; + + DELETE FROM "ShardReservations" + WHERE "cluster" = target_cluster + AND "expiresAt" <= now; + FOR r IN UPDATE "ShardReservations" updated SET shard = src.shard FROM ( SELECT holder, row_number() OVER ( PARTITION BY "cluster" - ORDER BY shard NULLS LAST + ORDER BY shard NULLS LAST, holder ) - 1 AS shard, COUNT(*) OVER (PARTITION BY "cluster") AS "numShards" FROM "ShardReservations" @@ -44,12 +76,19 @@ BEGIN json_build_object('shard', r.shard, 'numShards', r."numShards") #>> '{}' ); END LOOP; + + UPDATE "ShardReservationClusters" + SET "reshardedAt" = now + WHERE "cluster" = target_cluster; + + RETURN NULL; END; $$ LANGUAGE plpgsql; + -- down -DROP FUNCTION "reshard_ShardReservations"(text); +DROP FUNCTION "reshard_ShardReservations"(text, interval); DROP TABLE "ShardReservations"; DROP TABLE "ShardReservationClusters"; diff --git a/src/umzug.js b/src/umzug.js deleted file mode 100644 index b45f641..0000000 --- a/src/umzug.js +++ /dev/null @@ -1,65 +0,0 @@ -/** - * @flow - * @prettier - */ - -import path from 'path' -import Umzug, { type Migration } from 'umzug' -import logger from 'log4jcore' -import { Client } from 'pg' -import UmzugPostgresStorage from './util/UmzugPostgresStorage' - -const log = logger('postgres-shard-coordinator:migrate') - -type Options = { - storageOptions: { - database: { - database: string, - user: string, - password: string, - host: string, - port: number, - }, - relation: string, - column: string, - }, -} - -const migrationsDir = path.join(__dirname, 'migrations') - -export default function createUmzug({ storageOptions }: Options): Umzug { - const query = async (sql: string): Promise => { - const client = new Client(storageOptions.database) - try { - await client.connect() - return await client.query(sql) - } finally { - await client.end() - } - } - - return new Umzug({ - logging: log.info.bind(log), - storage: new UmzugPostgresStorage(storageOptions), - migrations: { - params: [], - path: migrationsDir, - traverseDirectories: true, - pattern: /^\d+[\w-]+\.sql$/, - customResolver(file: string): Migration { - const code = require('fs').readFileSync(file, 'utf8') - const [up, down] = code.split(/^-- down.*$/im).map(s => s.trim()) - if (!up) { - throw new Error(`${path.basename(file)}: up SQL not found`) - } - if (!down) { - throw new Error(`${path.basename(file)}: down SQL not found`) - } - return { - up: () => query(up), - down: () => query(down), - } - }, - }, - }) -} diff --git a/src/umzugMigrationOptions.js b/src/umzugMigrationOptions.js new file mode 100644 index 0000000..d530933 --- /dev/null +++ b/src/umzugMigrationOptions.js @@ -0,0 +1,41 @@ +/** + * @flow + * @prettier + */ + +import path from 'path' +const migrationsDir = path.join(__dirname, 'migrations') + +export type MigrationParams = { + query: (sql: string) => Promise, +} + +export interface Migration { + up(MigrationParams): Promise; + down(MigrationParams): Promise; +} + +export default function umzugMigrationOptions(): { + path: string, + pattern: RegExp, + customResolver(file: string): Migration, +} { + return { + path: migrationsDir, + pattern: /^\d+-.+\.sql$/, + customResolver(file: string): Migration { + const code = require('fs').readFileSync(file, 'utf8') + const [up, down] = code.split(/^-- down.*$/im).map(s => s.trim()) + if (!up) { + throw new Error(`${path.basename(file)}: up SQL not found`) + } + if (!down) { + throw new Error(`${path.basename(file)}: down SQL not found`) + } + return { + up: ({ query }) => query(up), + down: ({ query }) => query(down), + } + }, + } +} diff --git a/test/configure.js b/test/configure.js index 2a58847..cf114a0 100644 --- a/test/configure.js +++ b/test/configure.js @@ -2,11 +2,16 @@ import chai from 'chai' import chaiAsPromised from 'chai-as-promised' -import createUmzug from '../src/umzug' +import Umzug from 'umzug' +import { umzugMigrationOptions } from '../src' import { Client } from 'pg' import { before, after, beforeEach } from 'mocha' -import ShardReservation from '../src/schema/ShardReservation' import { database } from './database' +import ShardReservationCluster from '../src/schema/ShardReservationCluster' +import UmzugPostgresStorage from './util/UmzugPostgresStorage' +import debug from 'debug' + +const migrationDebug = debug('postgres-shard-coordinator:migrate') chai.use(chaiAsPromised) @@ -18,19 +23,37 @@ const client = new Client(database) before(async function(): Promise { await client.connect() - const umzug = createUmzug({ - storageOptions: { + + const query = async (sql: string): Promise => { + const client = new Client(database) + try { + await client.connect() + migrationDebug(sql) + return await client.query(sql) + } finally { + await client.end() + } + } + + const umzug = new Umzug({ + logging: migrationDebug, + storage: new UmzugPostgresStorage({ database, relation: '"SequelizeMeta"', column: 'name', + }), + migrations: { + ...umzugMigrationOptions(), + params: [{ query }], }, }) + await umzug.down() await umzug.up() }) beforeEach(async function(): Promise { - await client.query(`TRUNCATE ${ShardReservation.tableName} CASCADE;`) + await client.query(`TRUNCATE ${ShardReservationCluster.tableName} CASCADE;`) }) after(async function(): Promise { diff --git a/test/index.js b/test/index.js index c7ce9fc..a728a3b 100644 --- a/test/index.js +++ b/test/index.js @@ -12,7 +12,9 @@ describe('ShardRegistrar', function() { this.timeout(30000) let registrars = [] - beforeEach(() => (registrars = [])) + beforeEach(async function(): Promise { + registrars = [] + }) afterEach(async function(): Promise { await Promise.all(registrars.map(registrar => registrar.stop())) }) @@ -26,12 +28,14 @@ describe('ShardRegistrar', function() { it('sequential three node test', async function(): Promise { const cluster = 'a' const heartbeatInterval = 1 - const gracePeriod = 1 + const gracePeriod = 3 + const reshardInterval = 5 const options = { database, cluster, heartbeatInterval, gracePeriod, + reshardInterval, } const registrar1 = createRegistrar(options) const registrar2 = createRegistrar(options) @@ -105,7 +109,8 @@ describe('ShardRegistrar', function() { }) it(`two clusters of registrars operating simultaneously`, async function(): Promise { const heartbeatInterval = 1 - const gracePeriod = 1 + const gracePeriod = 3 + const reshardInterval = 5 const numShards = 10 const clusterA = range(numShards).map(() => createRegistrar({ @@ -113,6 +118,7 @@ describe('ShardRegistrar', function() { cluster: 'a', heartbeatInterval, gracePeriod, + reshardInterval, }) ) const clusterB = range(numShards).map(() => @@ -121,6 +127,7 @@ describe('ShardRegistrar', function() { cluster: 'b', heartbeatInterval, gracePeriod, + reshardInterval, }) ) const aEvents = Promise.all( diff --git a/src/util/UmzugPostgresStorage.js b/test/util/UmzugPostgresStorage.js similarity index 100% rename from src/util/UmzugPostgresStorage.js rename to test/util/UmzugPostgresStorage.js diff --git a/yarn.lock b/yarn.lock index c9de3a4..b05267a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4915,15 +4915,6 @@ log-update@^2.3.0: cli-cursor "^2.0.0" wrap-ansi "^3.0.1" -log4jcore@^2.0.3: - version "2.0.3" - resolved "https://registry.yarnpkg.com/log4jcore/-/log4jcore-2.0.3.tgz#5b9b1fc4263d231591be438234d0dd3f3900a270" - integrity sha512-cj8wQ+wECBi9IXGnZZXEmIVlegpPM/Y5lm1uhyJmy489cbTl4iBOBrX+yumYhzEiIaFrXomIEAk7D/kaGzJYqQ== - dependencies: - babel-runtime "^6.23.0" - lodash "^4.17.4" - moment "^2.22.2" - loglevel-colored-level-prefix@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/loglevel-colored-level-prefix/-/loglevel-colored-level-prefix-1.0.0.tgz#6a40218fdc7ae15fc76c3d0f3e676c465388603e" @@ -5313,11 +5304,6 @@ modify-values@^1.0.0: resolved "https://registry.yarnpkg.com/modify-values/-/modify-values-1.0.1.tgz#b3939fa605546474e3e3e3c63d64bd43b4ee6022" integrity sha512-xV2bxeN6F7oYjZWTe/YPAy6MN2M+sL4u/Rlm2AHCIVGfo2p1yGmBHQ6vHehl4bRTZBdHu3TSkWdYgkwpYzAGSw== -moment@^2.22.2: - version "2.23.0" - resolved "https://registry.yarnpkg.com/moment/-/moment-2.23.0.tgz#759ea491ac97d54bac5ad776996e2a58cc1bc225" - integrity sha512-3IE39bHVqFbWWaPOMHZF98Q9c3LDKGTmypMiTM2QygGXXElkFWIH7GxfmlwmY2vwa+wmNsoYZmG2iusf1ZjJoA== - move-concurrently@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/move-concurrently/-/move-concurrently-1.0.1.tgz#be2c005fda32e0b29af1f05d7c4b33214c701f92"