Skip to content

Commit

Permalink
feat(queue-scheduler): add better event typing (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 29, 2021
1 parent 2325ff7 commit b23c006
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/classes/child-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class ChildProcessor {

public async stop() {}

async waitForCurrentJobAndExit() {
async waitForCurrentJobAndExit(): Promise<void> {
this.status = ChildStatus.Terminating;
try {
await this.currentJobPromise;
Expand Down
57 changes: 46 additions & 11 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ import { array2obj, isRedisInstance } from '../utils';
import { QueueBase } from './queue-base';
import { Scripts } from './scripts';

export interface QueueSchedulerDeclaration {
on(event: 'stalled', listener: (jobId: string, prev: string) => void): this;
on(
event: 'failed',
listener: (jobId: string, failedReason: Error, prev: string) => void,
): this;
on(event: string, listener: Function): this;
export interface QueueSchedulerListener {
/**
* Listen to 'stalled' event.
*
* This event is triggered when a job gets stalled.
*/
stalled: (jobId: string, prev: string) => void;

/**
* Listen to 'failed' event.
*
* This event is triggered when a job has thrown an exception.
*/
failed: (jobId: string, failedReason: Error, prev: string) => void;
}

/**
Expand All @@ -32,10 +39,7 @@ export interface QueueSchedulerDeclaration {
* jobs, etc, will not work correctly or at all.
*
*/
export class QueueScheduler
extends QueueBase
implements QueueSchedulerDeclaration
{
export class QueueScheduler extends QueueBase {
private nextTimestamp = Number.MAX_VALUE;
private isBlocked = false;
private running = false;
Expand Down Expand Up @@ -65,6 +69,37 @@ export class QueueScheduler
}
}

emit<U extends keyof QueueSchedulerListener>(
event: U,
...args: Parameters<QueueSchedulerListener[U]>
): boolean {
return super.emit(event, ...args);
}

off<U extends keyof QueueSchedulerListener>(
eventName: U,
listener: QueueSchedulerListener[U],
): this {
super.off(eventName, listener);
return this;
}

on<U extends keyof QueueSchedulerListener>(
event: U,
listener: QueueSchedulerListener[U],
): this {
super.on(event, listener);
return this;
}

once<U extends keyof QueueSchedulerListener>(
event: U,
listener: QueueSchedulerListener[U],
): this {
super.once(event, listener);
return this;
}

async run(): Promise<void> {
if (!this.running) {
try {
Expand Down

0 comments on commit b23c006

Please sign in to comment.