diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index dab0fecf86..4a42362534 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -58,7 +58,6 @@ local parentData --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" ---- @include "includes/isQueuePaused" --- @include "includes/storeJob" if parentKey ~= nil then @@ -99,11 +98,8 @@ rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) -- mark that a delayed job is available -local isPaused = isQueuePaused(metaKey) -if not isPaused then - local markerKey = KEYS[1] - addDelayMarkerIfNeeded(markerKey, delayedKey) -end +local markerKey = KEYS[1] +addDelayMarkerIfNeeded(markerKey, delayedKey) -- Check if this job is a child of another job, if so add it to the parents dependencies if parentDependenciesKey ~= nil then diff --git a/src/commands/changeDelay-4.lua b/src/commands/changeDelay-4.lua index 130e962b45..f0bdb6ac5a 100644 --- a/src/commands/changeDelay-4.lua +++ b/src/commands/changeDelay-4.lua @@ -25,7 +25,6 @@ local rcall = redis.call --- @include "includes/addDelayMarkerIfNeeded" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/isQueuePaused" if rcall("EXISTS", ARGV[4]) == 1 then local jobId = ARGV[3] @@ -48,10 +47,7 @@ if rcall("EXISTS", ARGV[4]) == 1 then "jobId", jobId, "delay", delayedTimestamp) -- mark that a delayed job is available - local isPaused = isQueuePaused(KEYS[2]) - if not isPaused then - addDelayMarkerIfNeeded(KEYS[3], KEYS[1]) - end + addDelayMarkerIfNeeded(KEYS[3], KEYS[1]) return 0 else diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-8.lua index 7636531def..e48d3f235d 100644 --- a/src/commands/moveToDelayed-8.lua +++ b/src/commands/moveToDelayed-8.lua @@ -32,7 +32,6 @@ local rcall = redis.call --- @include "includes/addDelayMarkerIfNeeded" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/isQueuePaused" --- @include "includes/removeLock" local jobKey = KEYS[5] @@ -65,11 +64,8 @@ if rcall("EXISTS", jobKey) == 1 then "jobId", jobId, "delay", delayedTimestamp) -- Check if we need to push a marker job to wake up sleeping workers. - local isPaused = isQueuePaused(metaKey) - if not isPaused then - local markerKey = KEYS[1] - addDelayMarkerIfNeeded(markerKey, delayedKey) - end + local markerKey = KEYS[1] + addDelayMarkerIfNeeded(markerKey, delayedKey) return 0 else diff --git a/tests/test_delay.ts b/tests/test_delay.ts index 32d2444837..e459c2e2c3 100644 --- a/tests/test_delay.ts +++ b/tests/test_delay.ts @@ -175,6 +175,43 @@ describe('Delayed jobs', function () { }); }); + describe('when queue is paused', function () { + it('should keep moving delayed jobs to waiting', async function () { + const delayTime = 2500; + const margin = 1.2; + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + await queue.pause(); + const worker = new Worker(queueName, async () => {}, { + connection, + prefix, + }); + await worker.waitUntilReady(); + + const timestamp = Date.now(); + const publishHappened = false; + + const waiting = new Promise(resolve => { + queueEvents.on('waiting', () => { + console.log(Date.now() - timestamp); + const currentDelay = Date.now() - timestamp; + expect(currentDelay).to.be.greaterThanOrEqual(delayTime); + expect(currentDelay).to.be.lessThanOrEqual(delayTime * margin); + resolve(); + }); + }); + + await queue.add('test', { delayed: 'foobar' }, { delay: delayTime }); + + await waiting; + + await queueEvents.close(); + await worker.close(); + }); + }); + it('should process a delayed job added after an initial long delayed job', async function () { const oneYearDelay = 1000 * 60 * 60 * 24 * 365; // One year. const delayTime = 1000; diff --git a/tests/test_pause.ts b/tests/test_pause.ts index e5cc6aa284..9ad1f6e5c5 100644 --- a/tests/test_pause.ts +++ b/tests/test_pause.ts @@ -60,8 +60,8 @@ describe('Pause', function () { } const counts2 = await queue.getJobCounts('waiting', 'paused', 'delayed'); expect(counts2).to.have.property('waiting', 0); - expect(counts2).to.have.property('paused', 0); - expect(counts2).to.have.property('delayed', 1); + expect(counts2).to.have.property('paused', 1); + expect(counts2).to.have.property('delayed', 0); await worker.close(); }); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 266d842799..6d369599ee 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1738,7 +1738,7 @@ describe('workers', function () { const processing = new Promise((resolve, reject) => { processor = async (job: Job) => { try { - await delay(10); + await delay(20); expect(job.data.num).to.be.equal(counter); expect(job.data.foo).to.be.equal('bar'); if (counter === maxJobs) {