From 4e63f70aac367d4dd695bbe07c72a08a82a65d97 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Thu, 22 Apr 2021 07:29:01 -0500 Subject: [PATCH] fix(movetodelayed): check if job is in active state --- src/classes/scripts.ts | 9 ++------- src/commands/moveToDelayed-5.lua | 8 +++++++- src/interfaces/jobs-options.ts | 2 +- src/test/test_flow.ts | 4 +++- src/test/test_job.ts | 8 ++++++-- src/test/test_rate_limiter.ts | 13 +++++++------ 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 6b3cbd2ab5..f098682ea7 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -351,13 +351,8 @@ export class Scripts { const args = this.moveToDelayedArgs(queue, jobId, timestamp); const result = await (client).moveToDelayed(args); - switch (result) { - case -1: - throw new Error( - 'Missing Job ' + - jobId + - ' when trying to move from active to delayed', - ); + if (result < 0) { + throw this.finishedErrors(result, jobId, 'moveToDelayed'); } } diff --git a/src/commands/moveToDelayed-5.lua b/src/commands/moveToDelayed-5.lua index 6437c58820..3dd0e398a2 100644 --- a/src/commands/moveToDelayed-5.lua +++ b/src/commands/moveToDelayed-5.lua @@ -15,6 +15,7 @@ Output: 0 - OK -1 - Missing job. + -3 - Job not in active set. Events: - delayed key. @@ -27,8 +28,13 @@ if rcall("EXISTS", KEYS[3]) == 1 then local score = tonumber(ARGV[1]) local delayedTimestamp = (score / 0x1000) + local numRemovedElements = rcall("LREM", KEYS[1], -1, jobId) + + if(numRemovedElements < 1) then + return -3 + end + rcall("ZADD", KEYS[2], score, jobId) - rcall("LREM", KEYS[1], 0, jobId) rcall("XADD", KEYS[4], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp); rcall("XADD", KEYS[5], "*", "nextTimestamp", delayedTimestamp); diff --git a/src/interfaces/jobs-options.ts b/src/interfaces/jobs-options.ts index 715f61a935..cbd35c0fce 100644 --- a/src/interfaces/jobs-options.ts +++ b/src/interfaces/jobs-options.ts @@ -15,7 +15,7 @@ export interface JobsOptions { priority?: number; /** - * An amount of miliseconds to wait until this job can be processed. + * An amount of milliseconds to wait until this job can be processed. * Note that for accurate delays, worker and producers * should have their clocks synchronized. */ diff --git a/src/test/test_flow.ts b/src/test/test_flow.ts index 86a5097f6f..bf5df56689 100644 --- a/src/test/test_flow.ts +++ b/src/test/test_flow.ts @@ -325,6 +325,8 @@ describe('flows', () => { numJobs = await parentQueue.getWaitingCount(); expect(numJobs).to.be.equal(0); + await flow.close(); + await parentWorker.close(); await removeAllQueueData(new IORedis(), parentQueueName); }); @@ -413,7 +415,7 @@ describe('flows', () => { expect(await tree.children[1].job.getState()).to.be.equal('waiting'); await flow.close(); - + await worker.close(); await removeAllQueueData(new IORedis(), parentQueueName); }); diff --git a/src/test/test_job.ts b/src/test/test_job.ts index 0a14f9e507..635f337bce 100644 --- a/src/test/test_job.ts +++ b/src/test/test_job.ts @@ -272,21 +272,25 @@ describe('Job', function() { }); it('moves the job to delayed for retry if attempts are given and backoff is non zero', async function() { - const job = await Job.create( + const worker = new Worker(queueName); + const token = 'my-token'; + await Job.create( queue, 'test', { foo: 'bar' }, { attempts: 3, backoff: 300 }, ); + const job = (await worker.getNextJob(token)) as Job; const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); - await job.moveToFailed(new Error('test error'), '0', true); + await job.moveToFailed(new Error('test error'), token, true); const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(false); expect(job.stacktrace).not.be.equal(null); expect(job.stacktrace.length).to.be.equal(1); const isDelayed = await job.isDelayed(); expect(isDelayed).to.be.equal(true); + await worker.close(); }); it('applies stacktrace limit on failure', async function() { diff --git a/src/test/test_rate_limiter.ts b/src/test/test_rate_limiter.ts index f5af4e5b8a..ee75bbddb1 100644 --- a/src/test/test_rate_limiter.ts +++ b/src/test/test_rate_limiter.ts @@ -1,13 +1,13 @@ -import { Queue } from '@src/classes'; -import { QueueEvents } from '@src/classes/queue-events'; -import { QueueScheduler } from '@src/classes/queue-scheduler'; -import { Worker } from '@src/classes/worker'; +import { Queue } from '../classes'; +import { QueueEvents } from '../classes/queue-events'; +import { QueueScheduler } from '../classes/queue-scheduler'; +import { Worker } from '../classes/worker'; import { assert, expect } from 'chai'; import * as IORedis from 'ioredis'; import { after, every, last } from 'lodash'; import { beforeEach, describe, it } from 'mocha'; import { v4 } from 'uuid'; -import { removeAllQueueData } from '@src/utils'; +import { removeAllQueueData } from '../utils'; describe('Rate Limiter', function() { let queue: Queue; @@ -56,6 +56,7 @@ describe('Rate Limiter', function() { const delayedCount = await queue.getDelayedCount(); expect(delayedCount).to.eq(3); + await worker.close(); }); it('should obey the rate limit', async function() { @@ -205,7 +206,7 @@ describe('Rate Limiter', function() { }); queueEvents.on('completed', ({ jobId }) => { - const group = last(jobId.split(':')); + const group: string = last(jobId.split(':')); completed[group] = completed[group] || []; completed[group].push(Date.now());