From e2531ed0c1dc195f210f8cf996e9ffe04c9e4b7d Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Mon, 6 Sep 2021 19:08:03 -0500 Subject: [PATCH] fix(queue-events): duplicate connection (#733) fixes #726 --- src/classes/queue-events.ts | 14 ++++++++++---- src/classes/queue-keys.ts | 4 ++-- src/classes/queue.ts | 32 ++++++++++++++++---------------- src/classes/redis-connection.ts | 4 ++-- src/classes/worker.ts | 2 +- 5 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 30660ce469..52acc50bc4 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -1,7 +1,8 @@ import { QueueEventsOptions } from '../interfaces'; -import { array2obj, delay } from '../utils'; -import { QueueBase } from './queue-base'; +import { array2obj, delay, isRedisInstance } from '../utils'; import { StreamReadRaw } from '../interfaces/redis-streams'; +import { QueueBase } from './queue-base'; +import { RedisClient } from './redis-connection'; export declare interface QueueEvents { /** @@ -175,8 +176,13 @@ export declare interface QueueEvents { * */ export class QueueEvents extends QueueBase { - constructor(name: string, opts?: QueueEventsOptions) { - super(name, opts); + constructor(name: string, { connection, ...opts }: QueueEventsOptions = {}) { + super(name, { + ...opts, + connection: isRedisInstance(connection) + ? (connection).duplicate() + : connection, + }); this.opts = Object.assign( { diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 06fae0ddfd..33406a1292 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -33,11 +33,11 @@ export class QueueKeys { return keys; } - toKey(name: string, type: string) { + toKey(name: string, type: string): string { return `${this.getPrefixedQueueName(name)}:${type}`; } - getPrefixedQueueName(name: string) { + getPrefixedQueueName(name: string): string { return `${this.prefix}:${name}`; } } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 8448711327..ed717643f1 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -63,7 +63,7 @@ export class Queue< return this.jobsOpts; } - get repeat() { + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { this._repeat = new Repeat(this.name, { @@ -79,9 +79,9 @@ export class Queue< /** * Adds a new job to the queue. * - * @param name Name of the job to be added to the queue,. - * @param data Arbitrary data to append to the job. - * @param opts Job options that affects how the job is going to be processed. + * @param name - Name of the job to be added to the queue,. + * @param data - Arbitrary data to append to the job. + * @param opts - Job options that affects how the job is going to be processed. */ async add(name: N, data: T, opts?: JobsOptions) { if (opts && opts.repeat) { @@ -136,18 +136,18 @@ export class Queue< Adding jobs requires a LUA script to check first if the paused list exist and in that case it will add it there instead of the wait list. */ - async pause() { + async pause(): Promise { await Scripts.pause(this, true); this.emit('paused'); } /** - * Resumes the proocessing of this queue globally. + * Resumes the processing of this queue globally. * - * Thie method reverses the pause operation by resuming the processing of the + * The method reverses the pause operation by resuming the processing of the * queue. */ - async resume() { + async resume(): Promise { await Scripts.pause(this, false); this.emit('resumed'); } @@ -155,7 +155,7 @@ export class Queue< /** * Returns true if the queue is currently paused. */ - async isPaused() { + async isPaused(): Promise { const client = await this.client; const pausedKeyExists = await client.hexists(this.keys.meta, 'paused'); return pausedKeyExists === 1; @@ -164,9 +164,9 @@ export class Queue< /** * Get all repeatable meta jobs. * - * @param start offset of first job to return. - * @param end offset of last job to return. - * @param asc determine the order in which jobs are returned based on their + * @param start - Offset of first job to return. + * @param end - Offset of last job to return. + * @param asc - Determine the order in which jobs are returned based on their * next execution time. */ async getRepeatableJobs(start?: number, end?: number, asc?: boolean) { @@ -185,7 +185,7 @@ export class Queue< * Removes the given job from the queue as well as all its * dependencies. * - * @param jobId The if of the job to remove + * @param jobId - The if of the job to remove * @returns 1 if it managed to remove the job or -1 if the job or * any of its dependencies was locked. */ @@ -197,7 +197,7 @@ export class Queue< * Drains the queue, i.e., removes all jobs that are waiting * or delayed, but not active, completed or failed. * - * @param delayed pass true if it should also clean the + * @param delayed - Pass true if it should also clean the * delayed jobs. * */ @@ -237,8 +237,8 @@ export class Queue< * Cleans jobs from a queue. Similar to drain but keeps jobs within a certain * grace period. * - * @param {number} grace - The grace period - * @param {number} The max number of jobs to clean + * @param grace - The grace period + * @param The - Max number of jobs to clean * @param {string} [type=completed] - The type of job to clean * Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed. */ diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 7dbadbb910..4a4690caa3 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -48,7 +48,7 @@ export class RedisConnection extends EventEmitter { /** * Waits for a redis client to be ready. - * @param {Redis} redis client + * @param redis - client */ static async waitUntilReady(client: RedisClient) { if (client.status === 'ready') { @@ -142,7 +142,7 @@ export class RedisConnection extends EventEmitter { return client.connect(); } - async close() { + async close(): Promise { if (!this.closing) { this.closing = true; try { diff --git a/src/classes/worker.ts b/src/classes/worker.ts index e37594ae39..2e2975ba26 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1,6 +1,7 @@ import * as fs from 'fs'; import { Redis } from 'ioredis'; import * as path from 'path'; +import { v4 } from 'uuid'; import { Processor, WorkerOptions, GetNextJobOptions } from '../interfaces'; import { QueueBase } from './queue-base'; import { Repeat } from './repeat'; @@ -9,7 +10,6 @@ import { Job, JobJsonRaw } from './job'; import { RedisConnection, RedisClient } from './redis-connection'; import sandbox from './sandbox'; import { Scripts } from './scripts'; -import { v4 } from 'uuid'; import { TimerManager } from './timer-manager'; import { clientCommandMessageReg, delay, isRedisInstance } from '../utils';