Skip to content

Commit

Permalink
feat(core-kernel): initial draft of queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Faust committed Aug 28, 2019
1 parent d7d6e2d commit 5e5eb2a
Show file tree
Hide file tree
Showing 5 changed files with 530 additions and 8 deletions.
209 changes: 209 additions & 0 deletions __tests__/unit/core-kernel/services/queue/drivers/memory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import "jest-extended";
import { MemoryQueue } from "../../../../../../packages/core-kernel/src/services/queue/drivers/memory";
import delay from "delay";

const dummyFunction = async () => {};

let driver: MemoryQueue;
beforeEach(() => (driver = new MemoryQueue()));

describe("MemoryQueue", () => {
it("should start the default queue and process jobs", async () => {
let fnValue: number = 0;

expect(driver.size()).toBe(0);
expect(fnValue).toBe(0);

await driver.push(async () => fnValue++);

await driver.start();

expect(fnValue).toBe(1);
});

it("should start the given queue and process jobs", async () => {
let fnValue: number = 0;

expect(driver.size("balance")).toBe(0);
expect(fnValue).toBe(0);

await driver.pushOn("balance", async () => fnValue++);

await driver.start("balance");

expect(fnValue).toBe(1);
});

it("should stop the default queue and not process new jobs", async () => {
let fnValue: number = 0;

expect(driver.size()).toBe(0);
expect(fnValue).toBe(0);

await driver.push(async () => fnValue++);

await driver.start();

await driver.stop();

await driver.push(async () => fnValue++);

expect(fnValue).toBe(1);
});

it("should stop the given queue and not process new jobs", async () => {
let fnValue: number = 0;
const fnIncrement = async () => fnValue++;

expect(driver.size("balance")).toBe(0);
expect(fnValue).toBe(0);

await driver.pushOn("balance", async () => fnValue++);

await driver.start("balance");

await driver.stop("balance");

await driver.bulkOn("balance", [fnIncrement, fnIncrement]);

expect(fnValue).toBe(1);
});

it("should pause and resume the default queue", async () => {
let fnValue: number = 0;
const fnIncrement = async () => fnValue++;

expect(driver.size()).toBe(0);
expect(fnValue).toBe(0);

await driver.push(fnIncrement);

await driver.start();

expect(fnValue).toBe(1);

await driver.pause();

await driver.bulk([fnIncrement, fnIncrement]);

await driver.start();

expect(fnValue).toBe(3);
});

it("should pause and resume the given queue", async () => {
let fnValue: number = 0;
const fnIncrement = async () => fnValue++;

expect(driver.size("balance")).toBe(0);
expect(fnValue).toBe(0);

await driver.pushOn("balance", fnIncrement);

await driver.start("balance");

expect(fnValue).toBe(1);

await driver.pause("balance");

await driver.bulkOn("balance", [fnIncrement, fnIncrement]);

await driver.start("balance");

expect(fnValue).toBe(3);
});

it("should clear the default queue", async () => {
expect(driver.size()).toBe(0);

await driver.push(dummyFunction);

expect(driver.size()).toBe(1);

await driver.clear();

expect(driver.size()).toBe(0);
});

it("should clear the given queue", async () => {
expect(driver.size("balance")).toBe(0);

await driver.pushOn("balance", dummyFunction);

expect(driver.size("balance")).toBe(1);

await driver.clear("balance");

expect(driver.size("balance")).toBe(0);
});

it("should push the job onto the default queue", async () => {
expect(driver.size()).toBe(0);

await driver.push(dummyFunction);

expect(driver.size()).toBe(1);
});

it("should push the job onto the given queue", () => {
expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(0);

driver.pushOn("balance", dummyFunction);

expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(1);
});

it("should push the job onto the default queue with a 2 second delay", async () => {
expect(driver.size()).toBe(0);

await driver.later(2000, dummyFunction);

await delay(2000);

expect(driver.size()).toBe(1);
});

it("should push the job onto the default queue with a 2 second delay", async () => {
expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(0);

await driver.laterOn("balance", 2000, dummyFunction);

await delay(2000);

expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(1);
});

it("should push the job onto the default queue", async () => {
expect(driver.size()).toBe(0);

await driver.bulk([dummyFunction, dummyFunction]);

expect(driver.size()).toBe(2);
});

it("should push the job onto the given queue", async () => {
expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(0);

await driver.bulkOn("balance", [dummyFunction, dummyFunction]);

expect(driver.size()).toBe(0);
expect(driver.size("balance")).toBe(2);
});

it("should return the name of the default queue", () => {
expect(driver.getDefaultQueue()).toBe("default");
});

it("should set the name of the default queue", () => {
expect(driver.getDefaultQueue()).toBe("default");

driver.setDefaultQueue("new-default");

expect(driver.getDefaultQueue()).toBe("new-default");
});
});
126 changes: 125 additions & 1 deletion packages/core-kernel/src/contracts/kernel/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,129 @@
* @interface Queue
*/
export interface Queue {
//
/**
* Start the queue.
*
* @param {string} [queue]
* @returns {Promise<void>}
* @memberof Queue
*/
start(queue?: string): Promise<void>;

/**
* Stop the queue.
*
* @param {string} [queue]
* @returns {Promise<void>}
* @memberof Queue
*/
stop(queue?: string): Promise<void>;

/**
* Pause the queue.
*
* @param {string} [queue]
* @returns {Promise<void>}
* @memberof Queue
*/
pause(queue?: string): Promise<void>;

/**
* Clear the queue.
*
* @param {string} [queue]
* @returns {Promise<void>}
* @memberof Queue
*/
clear(queue?: string): Promise<void>;

/**
* Push a new job onto the default queue.
*
* @template T
* @param {() => PromiseLike<T>} fn
* @returns {Promise<void>}
* @memberof Queue
*/
push<T = any>(fn: () => PromiseLike<T>): Promise<void>;

/**
* Push a new job onto the given queue.
*
* @template T
* @param {string} queue
* @param {() => PromiseLike<T>} fn
* @returns {Promise<void>}
* @memberof Queue
*/
pushOn<T>(queue: string, fn: () => PromiseLike<T>): Promise<void>;

/**
* Push a new job onto the default queue after a delay.
*
* @template T
* @param {number} delay
* @param {() => PromiseLike<T>} fn
* @returns {Promise<void>}
* @memberof Queue
*/
later<T>(delay: number, fn: () => PromiseLike<T>): Promise<void>;

/**
* Push a new job onto the given queue after a delay.
*
* @template T
* @param {string} queue
* @param {number} delay
* @param {() => PromiseLike<T>} fn
* @returns {Promise<void>}
* @memberof Queue
*/
laterOn<T>(queue: string, delay: number, fn: () => PromiseLike<T>): Promise<void>;

/**
* Push an array of jobs onto the default queue.
*
* @template T
* @param {(() => PromiseLike<T>)[]} functions
* @returns {Promise<void>}
* @memberof Queue
*/
bulk<T>(functions: (() => PromiseLike<T>)[]): Promise<void>;

/**
* Push an array of jobs onto the given queue.
*
* @template T
* @param {string} queue
* @param {(() => PromiseLike<T>)[]} functions
* @returns {Promise<void>}
* @memberof Queue
*/
bulkOn<T>(queue: string, functions: (() => PromiseLike<T>)[]): Promise<void>;

/**
* Get the size of the given queue.
*
* @param {string} [queue]
* @returns {number}
* @memberof Queue
*/
size(queue?: string): number;

/**
* Get the connection name for the queue.
*
* @returns {string}
* @memberof Queue
*/
getDefaultQueue(): string;

/**
* Set the connection name for the queue.
*
* @param {string} name
* @memberof Queue
*/
setDefaultQueue(name: string): void;
}
Loading

0 comments on commit 5e5eb2a

Please sign in to comment.