Skip to content

Commit

Permalink
feat(core-kernel): dispatch queue events (#3769)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastijankuzner authored Jun 7, 2020
1 parent 9ea9bb4 commit 9ac9b45
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 4 deletions.
75 changes: 73 additions & 2 deletions __tests__/unit/core-kernel/services/queue/drivers/memory.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import "jest-extended";

import { sleep } from "@arkecosystem/utils";
import { Contracts } from "@packages/core-kernel/src";
import { Container, Contracts, Enums } from "@packages/core-kernel/src";
import { MemoryQueue } from "@packages/core-kernel/src/services/queue/drivers/memory";
import { Sandbox } from "@packages/core-test-framework";

class DummyClass implements Contracts.Kernel.QueueJob {
public constructor(private readonly method?) {}
Expand All @@ -10,8 +13,46 @@ class DummyClass implements Contracts.Kernel.QueueJob {
}
}

let sanbox: Sandbox;
let driver: MemoryQueue;
beforeEach(() => (driver = new MemoryQueue()));

const mockEventDispatcher = {
dispatch: jest.fn(),
};

beforeEach(() => {
sanbox = new Sandbox();

sanbox.app.bind(Container.Identifiers.EventDispatcherService).toConstantValue(mockEventDispatcher);
driver = sanbox.app.resolve<MemoryQueue>(MemoryQueue);
});

afterEach(() => {
jest.clearAllMocks();
});

const expectEventData = () => {
return expect.objectContaining({
driver: "memory",
executionTime: expect.toBeNumber(),
});
};

const expectEventErrorData = () => {
return expect.objectContaining({
driver: "memory",
executionTime: expect.toBeNumber(),
error: expect.toBeObject(),
});
};

const delay = async (timeout) => {
await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, timeout);
});
};

describe("MemoryQueue", () => {
it("should start queue and process jobs", async () => {
Expand All @@ -24,6 +65,9 @@ describe("MemoryQueue", () => {
await driver.start();

expect(dummy).toHaveBeenCalled();

expect(mockEventDispatcher.dispatch).toHaveBeenCalledTimes(1);
expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(Enums.QueueEvent.Finished, expectEventData());
});

it("should stop queue and not process new jobs", async () => {
Expand All @@ -40,6 +84,9 @@ describe("MemoryQueue", () => {
await driver.push(new DummyClass(dummy));

expect(dummy).toHaveBeenCalled();

expect(mockEventDispatcher.dispatch).toHaveBeenCalledTimes(1);
expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(Enums.QueueEvent.Finished, expectEventData());
});

it("should pause and resume queue", async () => {
Expand All @@ -51,6 +98,8 @@ describe("MemoryQueue", () => {

await driver.start();

await delay(100);

expect(dummy).toHaveBeenCalled();

await driver.pause();
Expand All @@ -59,7 +108,29 @@ describe("MemoryQueue", () => {

await driver.resume();

await delay(100);

expect(dummy).toHaveBeenCalledTimes(3);

expect(mockEventDispatcher.dispatch).toHaveBeenCalledTimes(3);
expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(Enums.QueueEvent.Finished, expectEventData());
});

it("should dipatch error if error in queue", async () => {
const dummy: jest.Mock = jest.fn().mockImplementation(() => {
throw new Error();
});

await driver.push(new DummyClass(dummy));

driver.start();

// @ts-ignore
await expect(driver.lastQueue).rejects.toThrowError();

expect(dummy).toHaveBeenCalled();
expect(mockEventDispatcher.dispatch).toHaveBeenCalledTimes(1);
expect(mockEventDispatcher.dispatch).toHaveBeenCalledWith(Enums.QueueEvent.Failed, expectEventErrorData());
});

it("should clear queue", async () => {
Expand Down
9 changes: 9 additions & 0 deletions packages/core-kernel/src/enums/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,12 @@ export enum TransactionEvent {
RemovedFromPool = "transaction.pool.removed",
Reverted = "transaction.reverted",
}

/**
* @export
* @enum {number}
*/
export enum QueueEvent {
Finished = "queue.finished",
Failed = "queue.finished",
}
23 changes: 21 additions & 2 deletions packages/core-kernel/src/services/queue/drivers/memory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { performance } from "perf_hooks";

import { Application } from "../../../contracts/kernel/application";
import { Queue, QueueJob } from "../../../contracts/kernel/queue";
import { injectable } from "../../../ioc";
import { QueueEvent } from "../../../enums";
import { Identifiers, inject, injectable } from "../../../ioc";

/**
* @export
Expand All @@ -8,6 +12,9 @@ import { injectable } from "../../../ioc";
*/
@injectable()
export class MemoryQueue implements Queue {
@inject(Identifiers.Application)
protected readonly app!: Application;

/**
* @private
* @type {(QueueJob[])}
Expand Down Expand Up @@ -93,7 +100,7 @@ export class MemoryQueue implements Queue {
* @memberof MemoryQueue
*/
public async resume(): Promise<void> {
this.lastQueue = this.processFromIndex(this.index, this.lastResults);
this.lastQueue = this.processFromIndex(this.index + 1, this.lastResults);
}

/**
Expand Down Expand Up @@ -174,13 +181,25 @@ export class MemoryQueue implements Queue {
if (isRunning) {
this.isRunning = isRunning;

const start = performance.now();
try {
lastResults.push(await this.jobs[from].handle());

await this.app.events.dispatch(QueueEvent.Finished, {
driver: "memory",
executionTime: performance.now() - start,
});

return this.processFromIndex(from + 1, lastResults, this.isRunning);
} catch (error) {
this.isRunning = false;

await this.app.events.dispatch(QueueEvent.Failed, {
driver: "memory",
executionTime: performance.now() - start,
error: error,
});

throw new Error(
`Queue halted at job #${from + 1} due to error in handler ${this.jobs[this.index]}.`,
);
Expand Down

0 comments on commit 9ac9b45

Please sign in to comment.