From e66b97be4577d5ab373fff0f3f45d73de7842a37 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 27 Jul 2019 19:23:05 +0200 Subject: [PATCH] fix: fix a couple of job tests --- src/classes/scripts.ts | 4 ++ ...oveToDelayed-3.lua => moveToDelayed-5.lua} | 18 ++++----- src/commands/retryJob-3.lua | 38 ------------------- src/commands/retryJob-4.lua | 34 +++++++++++++++++ src/test/test_job.ts | 14 +++++++ 5 files changed, 59 insertions(+), 49 deletions(-) rename src/commands/{moveToDelayed-3.lua => moveToDelayed-5.lua} (64%) delete mode 100644 src/commands/retryJob-3.lua create mode 100644 src/commands/retryJob-4.lua diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 1d6aced963..1c96d55e91 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -253,6 +253,8 @@ export class Scripts { const keys = ['active', 'delayed', jobId].map(function(name) { return queue.toKey(name); }); + keys.push.apply(keys, [queue.eventStreamKey(), queue.delayStreamKey()]); + return keys.concat([JSON.stringify(timestamp), jobId]); } @@ -276,6 +278,8 @@ export class Scripts { return queue.toKey(name); }); + keys.push(queue.eventStreamKey()); + const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; return keys.concat([pushCmd, jobId]); diff --git a/src/commands/moveToDelayed-3.lua b/src/commands/moveToDelayed-5.lua similarity index 64% rename from src/commands/moveToDelayed-3.lua rename to src/commands/moveToDelayed-5.lua index 143a31197b..d852ed332d 100644 --- a/src/commands/moveToDelayed-3.lua +++ b/src/commands/moveToDelayed-5.lua @@ -5,6 +5,8 @@ KEYS[1] active key KEYS[2] delayed key KEYS[3] job key + KEYS[4] events stream + KEYS[5] delayed stream ARGV[1] delayedTimestamp ARGV[2] the id of the job @@ -13,7 +15,6 @@ Output: 0 - OK -1 - Missing job. - -2 - Job is locked. Events: - delayed key. @@ -22,20 +23,15 @@ local rcall = redis.call if rcall("EXISTS", KEYS[3]) == 1 then - -- Check for job lock - if ARGV[3] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - local lock = rcall("GET", lockKey) - if rcall("GET", lockKey) ~= ARGV[3] then - return -2 - end - end - local score = tonumber(ARGV[1]) + local delayedTimestamp = (score / 0x1000) + rcall("ZADD", KEYS[2], score, ARGV[2]) - rcall("PUBLISH", KEYS[2], (score / 0x1000)) rcall("LREM", KEYS[1], 0, ARGV[2]) + rcall("XADD", KEYS[4], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp); + rcall("XADD", KEYS[5], "*", "nextTimestamp", delayedTimestamp); + return 0 else return -1 diff --git a/src/commands/retryJob-3.lua b/src/commands/retryJob-3.lua deleted file mode 100644 index 0a1d135284..0000000000 --- a/src/commands/retryJob-3.lua +++ /dev/null @@ -1,38 +0,0 @@ ---[[ - Retries a failed job by moving it back to the wait queue. - - Input: - KEYS[1] 'active', - KEYS[2] 'wait' - KEYS[3] jobId - - ARGV[1] pushCmd - ARGV[2] jobId - ARGV[3] token - - Events: - 'prefix:added' - - Output: - 0 - OK - -1 - Missing key - -2 - Job Not locked -]] -if redis.call("EXISTS", KEYS[3]) == 1 then - - -- Check for job lock - if ARGV[3] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - local lock = redis.call("GET", lockKey) - if redis.call("GET", lockKey) ~= ARGV[3] then - return -2 - end - end - - redis.call("LREM", KEYS[1], 0, ARGV[2]) - redis.call(ARGV[1], KEYS[2], ARGV[2]) - - return 0 -else - return -1 -end diff --git a/src/commands/retryJob-4.lua b/src/commands/retryJob-4.lua new file mode 100644 index 0000000000..f0d40d61f6 --- /dev/null +++ b/src/commands/retryJob-4.lua @@ -0,0 +1,34 @@ +--[[ + Retries a failed job by moving it back to the wait queue. + + Input: + KEYS[1] 'active', + KEYS[2] 'wait' + KEYS[3] jobId + KEYS[4] events stream + + ARGV[1] pushCmd + ARGV[2] jobId + ARGV[3] token + + Events: + 'prefix:added' + + Output: + 0 - OK + -1 - Missing key +]] +local rcall = redis.call + +if rcall("EXISTS", KEYS[3]) == 1 then + + rcall("LREM", KEYS[1], 0, ARGV[2]) + rcall(ARGV[1], KEYS[2], ARGV[2]) + + -- Emit waiting event + rcall("XADD", KEYS[4], "*", "event", "waiting", "jobId", ARGV[2], "prev", "failed"); + + return 0 +else + return -1 +end diff --git a/src/test/test_job.ts b/src/test/test_job.ts index b5a90135d6..63d87b7963 100644 --- a/src/test/test_job.ts +++ b/src/test/test_job.ts @@ -130,6 +130,9 @@ describe('Job', function() { }); it('moves the job to wait for retry if attempts are given', async function() { + const queueEvents = new QueueEvents(queueName); + await queueEvents.init(); + const job = await Job.create( queue, 'test', @@ -138,13 +141,24 @@ describe('Job', function() { ); const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); + + const waiting = new Promise( resolve => { + queueEvents.on('waiting', resolve); + }); + + await job.moveToFailed(new Error('test error'), true); + + await waiting; + const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(false); expect(job.stacktrace).not.be.equal(null); expect(job.stacktrace.length).to.be.equal(1); const isWaiting = await job.isWaiting(); expect(isWaiting).to.be.equal(true); + + await queueEvents.close(); }); it('marks the job as failed when attempts made equal to attempts given', async function() {