Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-kernel): dispatch schedule events #3770

Merged
merged 7 commits into from
Jun 7, 2020
Merged
94 changes: 81 additions & 13 deletions __tests__/unit/core-kernel/services/schedule/block-job.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
import "jest-extended";

import { BlockEvent } from "@packages/core-kernel/src/enums/events";
import { Container, Identifiers, interfaces } from "@packages/core-kernel/src/ioc";
import { BlockEvent, ScheduleEvent } from "@packages/core-kernel/src/enums/events";
import { Identifiers } from "@packages/core-kernel/src/ioc";
import { MemoryEventDispatcher } from "@packages/core-kernel/src/services/events/drivers/memory";
import { BlockJob } from "@packages/core-kernel/src/services/schedule/block-job";
import { Sandbox } from "@packages/core-test-framework";

let sandbox: Sandbox;
let job: BlockJob;
let eventDispatcher: MemoryEventDispatcher;

const delay = async (timeout) => {
await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, timeout);
});
};

const expectFinishedEventData = () => {
return expect.objectContaining({
executionTime: expect.toBeNumber(),
blockCount: expect.toBeNumber(),
});
};

beforeEach(() => {
const container: interfaces.Container = new Container();
eventDispatcher = container.resolve<MemoryEventDispatcher>(MemoryEventDispatcher);
container.bind(Identifiers.EventDispatcherService).toConstantValue(eventDispatcher);
sandbox = new Sandbox();
eventDispatcher = sandbox.app.resolve<MemoryEventDispatcher>(MemoryEventDispatcher);

sandbox.app.bind(Identifiers.EventDispatcherService).toConstantValue(eventDispatcher);

job = container.resolve(BlockJob);
job = sandbox.app.resolve<BlockJob>(BlockJob);
});

describe("BlockJob", () => {
it("should execute on cron", () => {
it("should execute on cron", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.cron(3).execute(fn);
Expand All @@ -31,10 +52,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 9 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 10 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every block", () => {
it("should execute every block", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyBlock().execute(fn);
Expand All @@ -45,10 +73,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 1 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 1 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every five blocks", () => {
it("should execute every five blocks", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyFiveBlocks().execute(fn);
Expand All @@ -63,10 +98,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 15 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 16 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every ten blocks", () => {
it("should execute every ten blocks", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyTenBlocks().execute(fn);
Expand All @@ -81,10 +123,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 30 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 31 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every fifteen blocks", () => {
it("should execute every fifteen blocks", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyFifteenBlocks().execute(fn);
Expand All @@ -99,10 +148,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 45 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 46 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every thirty blocks", () => {
it("should execute every thirty blocks", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyThirtyBlocks().execute(fn);
Expand All @@ -117,10 +173,17 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 90 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 91 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});

it("should execute every round", () => {
it("should execute every round", async () => {
const spyOnDispatch = jest.spyOn(eventDispatcher, "dispatch");

const fn: Function = jest.fn();

job.everyRound().execute(fn);
Expand All @@ -135,6 +198,11 @@ describe("BlockJob", () => {
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 153 });
eventDispatcher.dispatchSync(BlockEvent.Received, { height: 154 });

await delay(100);

expect(fn).toHaveBeenCalledTimes(3);

expect(spyOnDispatch).toHaveBeenCalledTimes(3);
expect(spyOnDispatch).toHaveBeenCalledWith(ScheduleEvent.BlockJobFinished, expectFinishedEventData());
});
});
35 changes: 32 additions & 3 deletions __tests__/unit/core-kernel/services/schedule/cron-job.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import "jest-extended";

import { Container, Enums } from "@packages/core-kernel";
import { CronJob } from "@packages/core-kernel/src/services/schedule/cron-job";
import { Sandbox } from "@packages/core-test-framework";
import moment from "moment-timezone";
import { useFakeTimers } from "sinon";

let sandbox: Sandbox;
let job: CronJob;
let clock;
const mockEventDispatcher = {
dispatch: jest.fn(),
};

const days: Record<string, string> = {
monday: "2019-08-19 00:00:00",
tuesday: "2019-08-20 00:00:00",
Expand Down Expand Up @@ -31,6 +40,14 @@ const expectExecutionAfterDelay = (callback: CronJob, minutes: number): void =>
}

expect(fn).toHaveBeenCalledTimes(3);

expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(
Enums.ScheduleEvent.CronJobFinished,
expect.objectContaining({
executionTime: expect.toBeNumber(),
expression: expect.toBeString(),
}),
);
};

const expectExecutionOnDate = (callback: CronJob, day: string): void => {
Expand All @@ -49,11 +66,23 @@ const expectExecutionOnDate = (callback: CronJob, day: string): void => {
}

expect(fn).toHaveBeenCalledTimes(1);

expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(
Enums.ScheduleEvent.CronJobFinished,
expect.objectContaining({
executionTime: expect.toBeNumber(),
expression: expect.toBeString(),
}),
);
};

let job: CronJob;
let clock;
beforeEach(() => (job = new CronJob()));
beforeEach(() => {
sandbox = new Sandbox();

sandbox.app.bind(Container.Identifiers.EventDispatcherService).toConstantValue(mockEventDispatcher);

job = sandbox.app.resolve<CronJob>(CronJob);
});

afterEach(() => clock.restore());

Expand Down
9 changes: 9 additions & 0 deletions packages/core-kernel/src/enums/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ export enum TransactionEvent {
Reverted = "transaction.reverted",
}

/**
* @export
* @enum {number}
*/
export enum ScheduleEvent {
BlockJobFinished = "schedule.blockJob.finished",
CronJobFinished = "schedule.cronJob.finished",
}

/**
* @export
* @enum {number}
Expand Down
20 changes: 18 additions & 2 deletions packages/core-kernel/src/services/schedule/block-job.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Managers } from "@arkecosystem/crypto";
import { performance } from "perf_hooks";

import { Application } from "../../contracts/kernel";
import { EventDispatcher } from "../../contracts/kernel/events";
import { BlockEvent } from "../../enums";
import { BlockEvent, ScheduleEvent } from "../../enums";
import { Identifiers, inject, injectable } from "../../ioc";
import { Job } from "./interfaces";
import { ExecuteCallbackWhenReady } from "./listeners";
Expand All @@ -20,6 +22,9 @@ export class BlockJob implements Job {
*/
protected blockCount: number = 1;

@inject(Identifiers.Application)
protected readonly app!: Application;

/**
* @private
* @type {EventDispatcher}
Expand All @@ -33,7 +38,18 @@ export class BlockJob implements Job {
* @memberof BlockJob
*/
public execute(callback: Function): void {
this.events.listen(BlockEvent.Received, new ExecuteCallbackWhenReady(callback, this.blockCount));
const onCallback = async () => {
const start = performance.now();

await callback();

await this.app.events.dispatch(ScheduleEvent.BlockJobFinished, {
executionTime: performance.now() - start,
blockCount: this.blockCount,
});
};

this.events.listen(BlockEvent.Received, new ExecuteCallbackWhenReady(onCallback, this.blockCount));
}

/**
Expand Down
21 changes: 19 additions & 2 deletions packages/core-kernel/src/services/schedule/cron-job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { CronCommand, CronJob as Cron } from "cron";
import { performance } from "perf_hooks";

import { injectable } from "../../ioc";
import { Application } from "../../contracts/kernel";
import { ScheduleEvent } from "../../enums";
import { Identifiers, inject, injectable } from "../../ioc";
import { Job } from "./interfaces";

/**
Expand All @@ -14,6 +17,9 @@ import { Job } from "./interfaces";
*/
@injectable()
export class CronJob implements Job {
@inject(Identifiers.Application)
protected readonly app!: Application;

/**
* @private
* @type {string}
Expand All @@ -26,7 +32,18 @@ export class CronJob implements Job {
* @memberof CronJob
*/
public execute(callback: CronCommand): void {
new Cron(this.expression, callback).start();
const onCallback: CronCommand = () => {
const start = performance.now();
// @ts-ignore
callback();

this.app.events.dispatch(ScheduleEvent.CronJobFinished, {
executionTime: performance.now() - start,
expression: this.expression,
});
};

new Cron(this.expression, onCallback).start();
}

/**
Expand Down