diff --git a/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts b/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts new file mode 100644 index 0000000000..7d49501aa3 --- /dev/null +++ b/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts @@ -0,0 +1,209 @@ +import "jest-extended"; +import { MemoryQueue } from "../../../../../../packages/core-kernel/src/services/queue/drivers/memory"; +import delay from "delay"; + +const dummyFunction = async () => {}; + +let driver: MemoryQueue; +beforeEach(() => (driver = new MemoryQueue())); + +describe("MemoryQueue", () => { + it("should start the default queue and process jobs", async () => { + let fnValue: number = 0; + + expect(driver.size()).toBe(0); + expect(fnValue).toBe(0); + + await driver.push(async () => fnValue++); + + await driver.start(); + + expect(fnValue).toBe(1); + }); + + it("should start the given queue and process jobs", async () => { + let fnValue: number = 0; + + expect(driver.size("balance")).toBe(0); + expect(fnValue).toBe(0); + + await driver.pushOn("balance", async () => fnValue++); + + await driver.start("balance"); + + expect(fnValue).toBe(1); + }); + + it("should stop the default queue and not process new jobs", async () => { + let fnValue: number = 0; + + expect(driver.size()).toBe(0); + expect(fnValue).toBe(0); + + await driver.push(async () => fnValue++); + + await driver.start(); + + await driver.stop(); + + await driver.push(async () => fnValue++); + + expect(fnValue).toBe(1); + }); + + it("should stop the given queue and not process new jobs", async () => { + let fnValue: number = 0; + const fnIncrement = async () => fnValue++; + + expect(driver.size("balance")).toBe(0); + expect(fnValue).toBe(0); + + await driver.pushOn("balance", async () => fnValue++); + + await driver.start("balance"); + + await driver.stop("balance"); + + await driver.bulkOn("balance", [fnIncrement, fnIncrement]); + + expect(fnValue).toBe(1); + }); + + it("should pause and resume the default queue", async () => { + let fnValue: number = 0; + const fnIncrement = async () => fnValue++; + + expect(driver.size()).toBe(0); + expect(fnValue).toBe(0); + + await driver.push(fnIncrement); + + await driver.start(); + + expect(fnValue).toBe(1); + + await driver.pause(); + + await driver.bulk([fnIncrement, fnIncrement]); + + await driver.start(); + + expect(fnValue).toBe(3); + }); + + it("should pause and resume the given queue", async () => { + let fnValue: number = 0; + const fnIncrement = async () => fnValue++; + + expect(driver.size("balance")).toBe(0); + expect(fnValue).toBe(0); + + await driver.pushOn("balance", fnIncrement); + + await driver.start("balance"); + + expect(fnValue).toBe(1); + + await driver.pause("balance"); + + await driver.bulkOn("balance", [fnIncrement, fnIncrement]); + + await driver.start("balance"); + + expect(fnValue).toBe(3); + }); + + it("should clear the default queue", async () => { + expect(driver.size()).toBe(0); + + await driver.push(dummyFunction); + + expect(driver.size()).toBe(1); + + await driver.clear(); + + expect(driver.size()).toBe(0); + }); + + it("should clear the given queue", async () => { + expect(driver.size("balance")).toBe(0); + + await driver.pushOn("balance", dummyFunction); + + expect(driver.size("balance")).toBe(1); + + await driver.clear("balance"); + + expect(driver.size("balance")).toBe(0); + }); + + it("should push the job onto the default queue", async () => { + expect(driver.size()).toBe(0); + + await driver.push(dummyFunction); + + expect(driver.size()).toBe(1); + }); + + it("should push the job onto the given queue", () => { + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(0); + + driver.pushOn("balance", dummyFunction); + + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(1); + }); + + it("should push the job onto the default queue with a 2 second delay", async () => { + expect(driver.size()).toBe(0); + + await driver.later(2000, dummyFunction); + + await delay(2000); + + expect(driver.size()).toBe(1); + }); + + it("should push the job onto the default queue with a 2 second delay", async () => { + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(0); + + await driver.laterOn("balance", 2000, dummyFunction); + + await delay(2000); + + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(1); + }); + + it("should push the job onto the default queue", async () => { + expect(driver.size()).toBe(0); + + await driver.bulk([dummyFunction, dummyFunction]); + + expect(driver.size()).toBe(2); + }); + + it("should push the job onto the given queue", async () => { + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(0); + + await driver.bulkOn("balance", [dummyFunction, dummyFunction]); + + expect(driver.size()).toBe(0); + expect(driver.size("balance")).toBe(2); + }); + + it("should return the name of the default queue", () => { + expect(driver.getDefaultQueue()).toBe("default"); + }); + + it("should set the name of the default queue", () => { + expect(driver.getDefaultQueue()).toBe("default"); + + driver.setDefaultQueue("new-default"); + + expect(driver.getDefaultQueue()).toBe("new-default"); + }); +}); diff --git a/packages/core-kernel/src/contracts/kernel/queue.ts b/packages/core-kernel/src/contracts/kernel/queue.ts index ba59d24f7c..7ae44d447c 100644 --- a/packages/core-kernel/src/contracts/kernel/queue.ts +++ b/packages/core-kernel/src/contracts/kernel/queue.ts @@ -3,5 +3,129 @@ * @interface Queue */ export interface Queue { - // + /** + * Start the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof Queue + */ + start(queue?: string): Promise; + + /** + * Stop the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof Queue + */ + stop(queue?: string): Promise; + + /** + * Pause the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof Queue + */ + pause(queue?: string): Promise; + + /** + * Clear the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof Queue + */ + clear(queue?: string): Promise; + + /** + * Push a new job onto the default queue. + * + * @template T + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof Queue + */ + push(fn: () => PromiseLike): Promise; + + /** + * Push a new job onto the given queue. + * + * @template T + * @param {string} queue + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof Queue + */ + pushOn(queue: string, fn: () => PromiseLike): Promise; + + /** + * Push a new job onto the default queue after a delay. + * + * @template T + * @param {number} delay + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof Queue + */ + later(delay: number, fn: () => PromiseLike): Promise; + + /** + * Push a new job onto the given queue after a delay. + * + * @template T + * @param {string} queue + * @param {number} delay + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof Queue + */ + laterOn(queue: string, delay: number, fn: () => PromiseLike): Promise; + + /** + * Push an array of jobs onto the default queue. + * + * @template T + * @param {(() => PromiseLike)[]} functions + * @returns {Promise} + * @memberof Queue + */ + bulk(functions: (() => PromiseLike)[]): Promise; + + /** + * Push an array of jobs onto the given queue. + * + * @template T + * @param {string} queue + * @param {(() => PromiseLike)[]} functions + * @returns {Promise} + * @memberof Queue + */ + bulkOn(queue: string, functions: (() => PromiseLike)[]): Promise; + + /** + * Get the size of the given queue. + * + * @param {string} [queue] + * @returns {number} + * @memberof Queue + */ + size(queue?: string): number; + + /** + * Get the connection name for the queue. + * + * @returns {string} + * @memberof Queue + */ + getDefaultQueue(): string; + + /** + * Set the connection name for the queue. + * + * @param {string} name + * @memberof Queue + */ + setDefaultQueue(name: string): void; } diff --git a/packages/core-kernel/src/services/queue/drivers/memory.ts b/packages/core-kernel/src/services/queue/drivers/memory.ts index f5e6fe4092..1433b9c9c9 100644 --- a/packages/core-kernel/src/services/queue/drivers/memory.ts +++ b/packages/core-kernel/src/services/queue/drivers/memory.ts @@ -1,10 +1,193 @@ import { Queue } from "../../../contracts/kernel/queue"; +import PQueue from "p-queue"; +import { injectable } from "../../../container"; /** * @export - * @class Memory + * @class MemoryQueue * @implements {Queue} */ -export class Memory implements Queue { - // +@injectable() +export class MemoryQueue implements Queue { + /** + * @private + * @type {Map} + * @memberof MemoryQueue + */ + private readonly queues: Map = new Map(); + + /** + * @private + * @type {string} + * @memberof MemoryQueue + */ + private defaultQueue: string = "default"; + + /** + * Start the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof MemoryQueue + */ + public async start(queue?: string): Promise { + await this.firstOrCreate(queue).start(); + } + + /** + * Stop the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof MemoryQueue + */ + public async stop(queue?: string): Promise { + await this.queues.delete(queue || this.defaultQueue); + } + + /** + * Pause the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof MemoryQueue + */ + public async pause(queue?: string): Promise { + await this.firstOrCreate(queue).pause(); + } + + /** + * Clear the queue. + * + * @param {string} [queue] + * @returns {Promise} + * @memberof MemoryQueue + */ + public async clear(queue?: string): Promise { + await this.firstOrCreate(queue).clear(); + } + + /** + * Push a new job onto the default queue. + * + * @template T + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof MemoryQueue + */ + public async push(fn: () => PromiseLike): Promise { + this.firstOrCreate(this.defaultQueue).add(fn); + } + + /** + * Push a new job onto the given queue. + * + * @template T + * @param {string} queue + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof MemoryQueue + */ + public async pushOn(queue: string, fn: () => PromiseLike): Promise { + this.firstOrCreate(queue).add(fn); + } + + /** + * Push a new job onto the default queue after a delay. + * + * @template T + * @param {number} delay + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof MemoryQueue + */ + public async later(delay: number, fn: () => PromiseLike): Promise { + setTimeout(() => this.push(fn), delay); + } + + /** + * Push a new job onto the given queue after a delay. + * + * @template T + * @param {string} queue + * @param {number} delay + * @param {() => PromiseLike} fn + * @returns {Promise} + * @memberof MemoryQueue + */ + public async laterOn(queue: string, delay: number, fn: () => PromiseLike): Promise { + setTimeout(() => this.pushOn(queue, fn), delay); + } + + /** + * Push an array of jobs onto the default queue. + * + * @template T + * @param {(() => PromiseLike)[]} functions + * @returns {Promise} + * @memberof MemoryQueue + */ + public async bulk(functions: (() => PromiseLike)[]): Promise { + this.firstOrCreate(this.defaultQueue).addAll(functions); + } + + /** + * Push an array of jobs onto the given queue. + * + * @template T + * @param {string} queue + * @param {(() => PromiseLike)[]} functions + * @returns {Promise} + * @memberof MemoryQueue + */ + public async bulkOn(queue: string, functions: (() => PromiseLike)[]): Promise { + this.firstOrCreate(queue).addAll(functions); + } + + /** + * Get the size of the given queue. + * + * @param {string} queue + * @returns {number} + * @memberof MemoryQueue + */ + public size(queue?: string): number { + return this.firstOrCreate(queue).size; + } + + /** + * Get the connection name for the queue. + * + * @returns {string} + * @memberof MemoryQueue + */ + public getDefaultQueue(): string { + return this.defaultQueue; + } + + /** + * Set the connection name for the queue. + * + * @param {string} name + * @memberof MemoryQueue + */ + public setDefaultQueue(name: string): void { + this.defaultQueue = name; + } + + /** + * @private + * @param {string} name + * @returns {PQueue} + * @memberof MemoryQueue + */ + private firstOrCreate(name?: string): PQueue { + name = name || this.defaultQueue; + + if (!this.queues.has(name)) { + this.queues.set(name, new PQueue({ autoStart: false })); + } + + return this.queues.get(name); + } } diff --git a/packages/core-kernel/src/services/queue/manager.ts b/packages/core-kernel/src/services/queue/manager.ts index 318000aa3b..4200dc939d 100644 --- a/packages/core-kernel/src/services/queue/manager.ts +++ b/packages/core-kernel/src/services/queue/manager.ts @@ -1,6 +1,6 @@ import { Queue } from "../../contracts/kernel/queue"; import { Manager } from "../../support/manager"; -import { Memory } from "./drivers/memory"; +import { MemoryQueue } from "./drivers/memory"; /** * @export @@ -15,7 +15,7 @@ export class QueueManager extends Manager { * @memberof QueueManager */ public async createMemoryDriver(): Promise { - return this.app.resolve(Memory); + return this.app.resolve(MemoryQueue); } /** diff --git a/packages/core-kernel/src/services/queue/service-provider.ts b/packages/core-kernel/src/services/queue/service-provider.ts index c64e39abda..2a743bffd2 100644 --- a/packages/core-kernel/src/services/queue/service-provider.ts +++ b/packages/core-kernel/src/services/queue/service-provider.ts @@ -1,6 +1,6 @@ import { ServiceProvider as BaseServiceProvider } from "../../providers"; import { QueueManager } from "./manager"; -import { Identifiers } from "../../container"; +import { Identifiers, interfaces } from "../../container"; export class ServiceProvider extends BaseServiceProvider { /** @@ -13,6 +13,12 @@ export class ServiceProvider extends BaseServiceProvider { .to(QueueManager) .inSingletonScope(); - // await this.app.get(Identifiers.QueueManager).boot(); + await this.app.get(Identifiers.QueueManager).boot(); + + this.app + .bind(Identifiers.QueueService) + .toDynamicValue((context: interfaces.Context) => + context.container.get(Identifiers.QueueManager).driver(), + ); } }