Skip to content

Commit

Permalink
fix(queue-events): duplicate connection (#733) fixes #726
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Sep 7, 2021
1 parent ce0646c commit e2531ed
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 25 deletions.
14 changes: 10 additions & 4 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { QueueEventsOptions } from '../interfaces';
import { array2obj, delay } from '../utils';
import { QueueBase } from './queue-base';
import { array2obj, delay, isRedisInstance } from '../utils';
import { StreamReadRaw } from '../interfaces/redis-streams';
import { QueueBase } from './queue-base';
import { RedisClient } from './redis-connection';

export declare interface QueueEvents {
/**
Expand Down Expand Up @@ -175,8 +176,13 @@ export declare interface QueueEvents {
*
*/
export class QueueEvents extends QueueBase {
constructor(name: string, opts?: QueueEventsOptions) {
super(name, opts);
constructor(name: string, { connection, ...opts }: QueueEventsOptions = {}) {
super(name, {
...opts,
connection: isRedisInstance(connection)
? (<RedisClient>connection).duplicate()
: connection,
});

this.opts = Object.assign(
{
Expand Down
4 changes: 2 additions & 2 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export class QueueKeys {
return keys;
}

toKey(name: string, type: string) {
toKey(name: string, type: string): string {
return `${this.getPrefixedQueueName(name)}:${type}`;
}

getPrefixedQueueName(name: string) {
getPrefixedQueueName(name: string): string {
return `${this.prefix}:${name}`;
}
}
32 changes: 16 additions & 16 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class Queue<
return this.jobsOpts;
}

get repeat() {
get repeat(): Promise<Repeat> {
return new Promise<Repeat>(async resolve => {
if (!this._repeat) {
this._repeat = new Repeat(this.name, {
Expand All @@ -79,9 +79,9 @@ export class Queue<
/**
* Adds a new job to the queue.
*
* @param name Name of the job to be added to the queue,.
* @param data Arbitrary data to append to the job.
* @param opts Job options that affects how the job is going to be processed.
* @param name - Name of the job to be added to the queue,.
* @param data - Arbitrary data to append to the job.
* @param opts - Job options that affects how the job is going to be processed.
*/
async add(name: N, data: T, opts?: JobsOptions) {
if (opts && opts.repeat) {
Expand Down Expand Up @@ -136,26 +136,26 @@ export class Queue<
Adding jobs requires a LUA script to check first if the paused list exist
and in that case it will add it there instead of the wait list.
*/
async pause() {
async pause(): Promise<void> {
await Scripts.pause(this, true);
this.emit('paused');
}

/**
* Resumes the proocessing of this queue globally.
* Resumes the processing of this queue globally.
*
* Thie method reverses the pause operation by resuming the processing of the
* The method reverses the pause operation by resuming the processing of the
* queue.
*/
async resume() {
async resume(): Promise<void> {
await Scripts.pause(this, false);
this.emit('resumed');
}

/**
* Returns true if the queue is currently paused.
*/
async isPaused() {
async isPaused(): Promise<boolean> {
const client = await this.client;
const pausedKeyExists = await client.hexists(this.keys.meta, 'paused');
return pausedKeyExists === 1;
Expand All @@ -164,9 +164,9 @@ export class Queue<
/**
* Get all repeatable meta jobs.
*
* @param start offset of first job to return.
* @param end offset of last job to return.
* @param asc determine the order in which jobs are returned based on their
* @param start - Offset of first job to return.
* @param end - Offset of last job to return.
* @param asc - Determine the order in which jobs are returned based on their
* next execution time.
*/
async getRepeatableJobs(start?: number, end?: number, asc?: boolean) {
Expand All @@ -185,7 +185,7 @@ export class Queue<
* Removes the given job from the queue as well as all its
* dependencies.
*
* @param jobId The if of the job to remove
* @param jobId - The if of the job to remove
* @returns 1 if it managed to remove the job or -1 if the job or
* any of its dependencies was locked.
*/
Expand All @@ -197,7 +197,7 @@ export class Queue<
* Drains the queue, i.e., removes all jobs that are waiting
* or delayed, but not active, completed or failed.
*
* @param delayed pass true if it should also clean the
* @param delayed - Pass true if it should also clean the
* delayed jobs.
*
*/
Expand Down Expand Up @@ -237,8 +237,8 @@ export class Queue<
* Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
* grace period.
*
* @param {number} grace - The grace period
* @param {number} The max number of jobs to clean
* @param grace - The grace period
* @param The - Max number of jobs to clean
* @param {string} [type=completed] - The type of job to clean
* Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class RedisConnection extends EventEmitter {

/**
* Waits for a redis client to be ready.
* @param {Redis} redis client
* @param redis - client
*/
static async waitUntilReady(client: RedisClient) {
if (client.status === 'ready') {
Expand Down Expand Up @@ -142,7 +142,7 @@ export class RedisConnection extends EventEmitter {
return client.connect();
}

async close() {
async close(): Promise<void> {
if (!this.closing) {
this.closing = true;
try {
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as fs from 'fs';
import { Redis } from 'ioredis';
import * as path from 'path';
import { v4 } from 'uuid';
import { Processor, WorkerOptions, GetNextJobOptions } from '../interfaces';
import { QueueBase } from './queue-base';
import { Repeat } from './repeat';
Expand All @@ -9,7 +10,6 @@ import { Job, JobJsonRaw } from './job';
import { RedisConnection, RedisClient } from './redis-connection';
import sandbox from './sandbox';
import { Scripts } from './scripts';
import { v4 } from 'uuid';
import { TimerManager } from './timer-manager';
import { clientCommandMessageReg, delay, isRedisInstance } from '../utils';

Expand Down

0 comments on commit e2531ed

Please sign in to comment.