diff --git a/src/classes/job.ts b/src/classes/job.ts index dfd6207039..ddbeb9c172 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -585,7 +585,9 @@ export class Job< ); const result = await this.scripts.moveToFinished(this.id, args); - this.finishedOn = args[14] as number; + this.finishedOn = args[ + this.scripts.moveToFinishedKeys.length + 1 + ] as number; return result; } @@ -667,7 +669,7 @@ export class Job< fetchNext, ); (multi).moveToFinished(args); - finishedOn = args[14]; + finishedOn = args[this.scripts.moveToFinishedKeys.length + 1] as number; command = 'failed'; } diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 1108c66466..29224c2bbd 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -22,7 +22,8 @@ export class QueueKeys { 'limiter', 'meta', 'events', - 'pc', + 'pc', // priority counter key + 'marker', // marker key ].forEach(key => { keys[key] = this.toKey(name, key); }); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 2e26d604b5..56327ae147 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -57,6 +57,7 @@ export class Scripts { undefined, undefined, undefined, + undefined, ]; } @@ -79,8 +80,7 @@ export class Scripts { ): Promise { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ - queueKeys.wait, - queueKeys.paused, + queueKeys.marker, queueKeys.meta, queueKeys.id, queueKeys.delayed, @@ -101,8 +101,7 @@ export class Scripts { ): Promise { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ - queueKeys.wait, - queueKeys.paused, + queueKeys.marker, queueKeys.meta, queueKeys.id, queueKeys.prioritized, @@ -197,6 +196,7 @@ export class Scripts { queueKeys.id, queueKeys.completed, queueKeys.events, + queueKeys.marker, ]; keys.push(pack(args), job.data, encodedOpts); result = await (client).addStandardJob(keys); @@ -223,7 +223,11 @@ export class Scripts { this.queue.toKey(name), ); - keys.push(this.queue.keys.events); + keys.push( + this.queue.keys.events, + this.queue.keys.delayed, + this.queue.keys.marker, + ); return (client).pause(keys.concat([pause ? 'paused' : 'resumed'])); } @@ -336,6 +340,7 @@ export class Scripts { keys[10] = queueKeys[target]; keys[11] = this.queue.toKey(job.id ?? ''); keys[12] = metricsKey; + keys[13] = this.queue.keys.marker; const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); @@ -631,6 +636,7 @@ export class Scripts { this.queue.keys.meta, this.queue.keys.prioritized, this.queue.keys.pc, + this.queue.keys.marker, ]; return keys.concat([ @@ -661,20 +667,16 @@ export class Scripts { timestamp = timestamp * 0x1000 + (+jobId & 0xfff); } + const queueKeys = this.queue.keys; const keys: (string | number)[] = [ - 'wait', - 'active', - 'prioritized', - 'delayed', - jobId, - ].map(name => { - return this.queue.toKey(name); - }); - keys.push.apply(keys, [ - this.queue.keys.events, - this.queue.keys.paused, - this.queue.keys.meta, - ]); + queueKeys.marker, + queueKeys.active, + queueKeys.prioritized, + queueKeys.delayed, + this.queue.toKey(jobId), + queueKeys.events, + queueKeys.meta, + ]; return keys.concat([ this.queue.keys[''], @@ -798,21 +800,17 @@ export class Scripts { token: string, ): (string | number)[] { const keys: (string | number)[] = [ - 'active', - 'wait', - 'paused', - jobId, - 'meta', - ].map(name => { - return this.queue.toKey(name); - }); - - keys.push( + this.queue.keys.active, + this.queue.keys.wait, + this.queue.keys.paused, + this.queue.toKey(jobId), + this.queue.keys.meta, this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc, - ); + this.queue.keys.marker, + ]; const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; @@ -909,7 +907,7 @@ export class Scripts { } } - async moveToActive(client: RedisClient, token: string, jobId?: string) { + async moveToActive(client: RedisClient, token: string) { const opts = this.queue.opts as WorkerOptions; const queueKeys = this.queue.keys; @@ -924,12 +922,12 @@ export class Scripts { queueKeys.paused, queueKeys.meta, queueKeys.pc, + queueKeys.marker, ]; const args: (string | number | boolean | Buffer)[] = [ queueKeys[''], Date.now(), - jobId || '', pack({ token, lockDuration: opts.lockDuration, @@ -955,6 +953,7 @@ export class Scripts { this.queue.keys.prioritized, this.queue.keys.pc, this.queue.keys.events, + this.queue.keys.marker, ]; const args = [this.queue.toKey(''), jobId]; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index f5915ba88b..44750503ce 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -180,7 +180,7 @@ export class Worker< private limitUntil = 0; private resumeWorker: () => void; private stalledCheckTimer: NodeJS.Timeout; - private waiting: Promise | null = null; + private waiting: Promise | null = null; private _repeat: Repeat; protected paused: Promise; @@ -426,7 +426,7 @@ export class Worker< numTotal = asyncFifoQueue.numTotal(); if (this.waiting && numTotal > 1) { - // We have a job waiting but we have others that we could start processing already + // We are waiting for jobs but we have others that we could start processing already break; } @@ -451,10 +451,7 @@ export class Worker< let job: Job | void; do { job = await asyncFifoQueue.fetch(); - } while ( - !job && - asyncFifoQueue.numQueued() > 0 - ); + } while (!job && asyncFifoQueue.numQueued() > 0); if (job) { const token = job.token; @@ -514,10 +511,13 @@ export class Worker< } if (this.drained && block && !this.limitUntil && !this.waiting) { - this.waiting = this.waitForJob(bclient); + this.waiting = this.waitForJob(bclient, this.blockUntil); try { - const jobId = await this.waiting; - return this.moveToActive(client, token, jobId); + this.blockUntil = await this.waiting; + + if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 0.1) { + return this.moveToActive(client, token); + } } catch (err) { // Swallow error if locally paused or closing since we did force a disconnection if ( @@ -558,29 +558,20 @@ export class Worker< protected async moveToActive( client: RedisClient, token: string, - jobId?: string, ): Promise> { - // If we get the special delayed job ID, we pick the delay as the next - // block timeout. - if (jobId && jobId.startsWith('0:')) { - this.blockUntil = parseInt(jobId.split(':')[1]) || 0; - - // Remove marker from active list. - await client.lrem(this.keys.active, 1, jobId); - if (this.blockUntil > 0) { - return; - } - } const [jobData, id, limitUntil, delayUntil] = - await this.scripts.moveToActive(client, token, jobId); + await this.scripts.moveToActive(client, token); this.updateDelays(limitUntil, delayUntil); return this.nextJobFromJobData(jobData, id, token); } - private async waitForJob(bclient: RedisClient): Promise { + private async waitForJob( + bclient: RedisClient, + blockUntil: number, + ): Promise { if (this.paused) { - return; + return Infinity; } try { @@ -588,14 +579,10 @@ export class Worker< if (!this.closing) { let blockTimeout = Math.max( - this.blockUntil - ? (this.blockUntil - Date.now()) / 1000 - : opts.drainDelay, + blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, 0, ); - let jobId; - // Blocking for less than 50ms is useless. if (blockTimeout > 0.05) { blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout @@ -607,16 +594,19 @@ export class Worker< // reference: https://github.com/taskforcesh/bullmq/issues/1658 blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); - jobId = await bclient.brpoplpush( - this.keys.wait, - this.keys.active, - blockTimeout, - ); - } else { - jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active); + // Markers should only be used for un-blocking, so we will handle them in this + // function only. + const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); + + if (result) { + const [_key, member, score] = result; + + if (member) { + return parseInt(score); + } + } } - this.blockUntil = 0; - return jobId; + return 0; } } catch (error) { if (isNotConnectionError(error)) { @@ -628,6 +618,7 @@ export class Worker< } finally { this.waiting = null; } + return Infinity; } /** diff --git a/src/commands/addDelayedJob-7.lua b/src/commands/addDelayedJob-6.lua similarity index 82% rename from src/commands/addDelayedJob-7.lua rename to src/commands/addDelayedJob-6.lua index 4772a2698d..bc4f2b2084 100644 --- a/src/commands/addDelayedJob-7.lua +++ b/src/commands/addDelayedJob-6.lua @@ -8,13 +8,12 @@ - Emits a global event 'delayed' if the job is delayed. Input: - KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'meta' - KEYS[4] 'id' - KEYS[5] 'delayed' - KEYS[6] 'completed' - KEYS[7] events stream key + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'delayed' + KEYS[5] 'completed' + KEYS[6] events stream key ARGV[1] msgpacked arguments array [1] key prefix, @@ -34,15 +33,12 @@ jobId - OK -5 - Missing parent key ]] -local waitKey = KEYS[1] -local pausedKey = KEYS[2] +local metaKey = KEYS[2] +local idKey = KEYS[3] +local delayedKey = KEYS[4] -local metaKey = KEYS[3] -local idKey = KEYS[4] -local delayedKey = KEYS[5] - -local completedKey = KEYS[6] -local eventsKey = KEYS[7] +local completedKey = KEYS[5] +local eventsKey = KEYS[6] local jobId local jobIdKey @@ -61,7 +57,7 @@ local parentData -- Includes --- @include "includes/storeJob" --- @include "includes/addDelayMarkerIfNeeded" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePaused" --- @include "includes/getNextDelayedTimestamp" --- @include "includes/updateExistingJobsParent" @@ -108,10 +104,12 @@ rcall("ZADD", delayedKey, score, jobId) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) --- If wait list is empty, and this delayed job is the next one to be processed, --- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list. -local target = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) -addDelayMarkerIfNeeded(target, delayedKey) +-- mark that a delayed job is available +local isPaused = isQueuePaused(metaKey) +if not isPaused then + local markerKey = KEYS[1] + addDelayMarkerIfNeeded(markerKey, delayedKey) +end -- Check if this job is a child of another job, if so add it to the parents dependencies -- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status. diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-7.lua similarity index 80% rename from src/commands/addPrioritizedJob-8.lua rename to src/commands/addPrioritizedJob-7.lua index 49997ef215..36cdc8bf66 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-7.lua @@ -5,14 +5,13 @@ - Adds the job to the "added" list so that workers gets notified. Input: - KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'meta' - KEYS[4] 'id' - KEYS[5] 'prioritized' - KEYS[6] 'completed' - KEYS[7] events stream key - KEYS[8] 'pc' priority counter + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'prioritized' + KEYS[5] 'completed' + KEYS[6] events stream key + KEYS[7] 'pc' priority counter ARGV[1] msgpacked arguments array [1] key prefix, @@ -31,17 +30,14 @@ Output: jobId - OK -5 - Missing parent key -]] -local waitKey = KEYS[1] -local pausedKey = KEYS[2] +]] +local metaKey = KEYS[2] +local idKey = KEYS[3] +local priorityKey = KEYS[4] -local metaKey = KEYS[3] -local idKey = KEYS[4] -local priorityKey = KEYS[5] - -local completedKey = KEYS[6] -local eventsKey = KEYS[7] -local priorityCounterKey = KEYS[8] +local completedKey = KEYS[5] +local eventsKey = KEYS[6] +local priorityCounterKey = KEYS[7] local jobId local jobIdKey @@ -59,8 +55,8 @@ local parentData -- Includes --- @include "includes/storeJob" +--- @include "includes/isQueuePaused" --- @include "includes/addJobWithPriority" ---- @include "includes/getTargetQueueList" --- @include "includes/updateExistingJobsParent" if parentKey ~= nil then @@ -98,10 +94,10 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) -local target, paused = getTargetQueueList(metaKey, waitKey, pausedKey) +-- Add the job to the prioritized set +local isPause = isQueuePaused(metaKey) +addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPause) -addJobWithPriority(waitKey, priorityKey, priority, paused, jobId, - priorityCounterKey) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", jobId) diff --git a/src/commands/addStandardJob-6.lua b/src/commands/addStandardJob-7.lua similarity index 96% rename from src/commands/addStandardJob-6.lua rename to src/commands/addStandardJob-7.lua index 1c067f496e..32f5503424 100644 --- a/src/commands/addStandardJob-6.lua +++ b/src/commands/addStandardJob-7.lua @@ -21,6 +21,7 @@ KEYS[4] 'id' KEYS[5] 'completed' KEYS[6] events stream key + KEYS[7] marker key ARGV[1] msgpacked arguments array [1] key prefix, @@ -97,6 +98,11 @@ storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) +if not paused then + -- mark that a job is available + rcall("ZADD", KEYS[7], 0, "0") +end + -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' rcall(pushCmd, target, jobId) diff --git a/src/commands/changePriority-5.lua b/src/commands/changePriority-5.lua deleted file mode 100644 index 57ac5ef6ee..0000000000 --- a/src/commands/changePriority-5.lua +++ /dev/null @@ -1,52 +0,0 @@ ---[[ - Change job priority - Input: - KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'meta' - KEYS[4] 'prioritized' - KEYS[5] 'pc' priority counter - - ARGV[1] priority value - ARGV[2] job key - ARGV[3] job id - ARGV[4] lifo - - Output: - 0 - OK - -1 - Missing job -]] -local jobKey = ARGV[2] -local jobId = ARGV[3] -local priority = tonumber(ARGV[1]) -local rcall = redis.call - --- Includes ---- @include "includes/addJobWithPriority" ---- @include "includes/getTargetQueueList" - -if rcall("EXISTS", jobKey) == 1 then - local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) - - if rcall("ZREM", KEYS[4], jobId) > 0 then - addJobWithPriority(KEYS[1], KEYS[4], priority, paused, jobId, KEYS[5]) - else - local numRemovedElements = rcall("LREM", target, -1, jobId) - if numRemovedElements > 0 then - -- Standard or priority add - if priority == 0 then - -- LIFO or FIFO - local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH'; - rcall(pushCmd, target, jobId) - else - addJobWithPriority(KEYS[1], KEYS[4], priority, paused, jobId, KEYS[5]) - end - end - end - - rcall("HSET", jobKey, "priority", priority) - - return 0 -else - return -1 -end diff --git a/src/commands/changePriority-6.lua b/src/commands/changePriority-6.lua new file mode 100644 index 0000000000..3802d1b740 --- /dev/null +++ b/src/commands/changePriority-6.lua @@ -0,0 +1,57 @@ +--[[ + Change job priority + Input: + KEYS[1] 'wait', + KEYS[2] 'paused' + KEYS[3] 'meta' + KEYS[4] 'prioritized' + KEYS[5] 'pc' priority counter + KEYS[6] 'marker' + + ARGV[1] priority value + ARGV[2] job key + ARGV[3] job id + ARGV[4] lifo + + Output: + 0 - OK + -1 - Missing job +]] +local jobKey = ARGV[2] +local jobId = ARGV[3] +local priority = tonumber(ARGV[1]) +local rcall = redis.call + +-- Includes +--- @include "includes/isQueuePaused" +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + +if rcall("EXISTS", jobKey) == 1 then + local metaKey = KEYS[3] + local isPaused = isQueuePaused(metaKey) + local markerKey = KEYS[6] + local prioritizedKey = KEYS[4] + + -- Re-add with the new priority + if rcall("ZREM", KEYS[4], jobId) > 0 then + addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5], + isPaused) + -- If the new priority is 0, then just leave the job where it is in the wait list. + elseif priority > 0 then + -- Job is already in the wait list, we need to re-add it with the new priority. + local target = isPaused and KEYS[2] or KEYS[1] + + local numRemovedElements = rcall("LREM", target, -1, jobId) + if numRemovedElements > 0 then + addJobWithPriority(markerKey, prioritizedKey, priority, jobId, + KEYS[5], isPaused) + end + end + + rcall("HSET", jobKey, "priority", priority) + + return 0 +else + return -1 +end diff --git a/src/commands/getCounts-1.lua b/src/commands/getCounts-1.lua index 5b5e949cfb..0b5886cc80 100644 --- a/src/commands/getCounts-1.lua +++ b/src/commands/getCounts-1.lua @@ -13,6 +13,7 @@ local results = {} for i = 1, #ARGV do local stateKey = prefix .. ARGV[i] if ARGV[i] == "wait" or ARGV[i] == "paused" then + -- Markers in waitlist DEPRECATED in v5: Remove in v6. local marker = rcall("LINDEX", stateKey, -1) if marker and string.sub(marker, 1, 2) == "0:" then local count = rcall("LLEN", stateKey) diff --git a/src/commands/getRanges-1.lua b/src/commands/getRanges-1.lua index 4f7b0823b5..b9ead60286 100644 --- a/src/commands/getRanges-1.lua +++ b/src/commands/getRanges-1.lua @@ -43,6 +43,7 @@ end for i = 4, #ARGV do local stateKey = prefix .. ARGV[i] if ARGV[i] == "wait" or ARGV[i] == "paused" then + -- Markers in waitlist DEPRECATED in v5: Remove in v6. local marker = rcall("LINDEX", stateKey, -1) if marker and string.sub(marker, 1, 2) == "0:" then local count = rcall("LLEN", stateKey) diff --git a/src/commands/includes/addDelayMarkerIfNeeded.lua b/src/commands/includes/addDelayMarkerIfNeeded.lua index af4e829521..35d5bac095 100644 --- a/src/commands/includes/addDelayMarkerIfNeeded.lua +++ b/src/commands/includes/addDelayMarkerIfNeeded.lua @@ -1,27 +1,13 @@ --[[ Add delay marker if needed. -]] - +]] -- Includes --- @include "getNextDelayedTimestamp" - -local function addDelayMarkerIfNeeded(targetKey, delayedKey) - local waitLen = rcall("LLEN", targetKey) - if waitLen <= 1 then +local function addDelayMarkerIfNeeded(markerKey, delayedKey) local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then - -- Check if there is already a marker with older timestamp - -- if there is, we need to replace it. - if waitLen == 1 then - local marker = rcall("LINDEX", targetKey, 0) - local oldTimestamp = tonumber(marker:sub(3)) - if oldTimestamp and oldTimestamp > nextTimestamp then - rcall("LSET", targetKey, 0, "0:" .. nextTimestamp) - end - else - -- if there is no marker, then we need to add one - rcall("LPUSH", targetKey, "0:" .. nextTimestamp) - end + -- Replace the score of the marker with the newest known + -- next timestamp. + rcall("ZADD", markerKey, nextTimestamp, "0") end - end end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index f8c981304a..c5f780cbd5 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -1,15 +1,11 @@ --[[ Function to add job considering priority. ]] - --- Includes ---- @include "addPriorityMarkerIfNeeded" - -local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey) +local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) rcall("ZADD", prioritizedKey, score, jobId) - if not paused then - addPriorityMarkerIfNeeded(waitKey) + if not isPaused then + rcall("ZADD", markerKey, 0, "0") end end diff --git a/src/commands/includes/addPriorityMarkerIfNeeded.lua b/src/commands/includes/addPriorityMarkerIfNeeded.lua deleted file mode 100644 index ea7c2d7b2b..0000000000 --- a/src/commands/includes/addPriorityMarkerIfNeeded.lua +++ /dev/null @@ -1,12 +0,0 @@ ---[[ - Function priority marker to wait if needed - in order to wake up our workers and to respect priority - order as much as possible -]] -local function addPriorityMarkerIfNeeded(waitKey) - local waitLen = rcall("LLEN", waitKey) - - if waitLen == 0 then - rcall("LPUSH", waitKey, "0:0") - end -end diff --git a/src/commands/includes/checkStalledJobs.lua b/src/commands/includes/checkStalledJobs.lua index f5a67049e0..21021dea0b 100644 --- a/src/commands/includes/checkStalledJobs.lua +++ b/src/commands/includes/checkStalledJobs.lua @@ -53,7 +53,7 @@ local function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey, -- Remove from active list for i, jobId in ipairs(stalling) do - + -- Markers in waitlist DEPRECATED in v5: Remove in v6. if string.sub(jobId, 1, 2) == "0:" then -- If the jobId is a delay marker ID we just remove it. rcall("LREM", activeKey, 1, jobId) diff --git a/src/commands/includes/isQueuePaused.lua b/src/commands/includes/isQueuePaused.lua new file mode 100644 index 0000000000..6885245ea5 --- /dev/null +++ b/src/commands/includes/isQueuePaused.lua @@ -0,0 +1,7 @@ +--[[ + Function to check for the meta.paused key to decide if we are paused or not + (since an empty list and !EXISTS are not really the same). +]] +local function isQueuePaused(queueMetaKey) + return rcall("HEXISTS", queueMetaKey, "paused") == 1 +end diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 42054c7a11..049fa86374 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -1,42 +1,51 @@ --[[ Validate and move parent to active if needed. ]] - -- Includes --- @include "addDelayMarkerIfNeeded" +--- @include "isQueuePaused" --- @include "addJobWithPriority" --- @include "getTargetQueueList" +local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, + parentKey, parentId, timestamp) + local isParentActive = rcall("ZSCORE", + parentQueueKey .. ":waiting-children", parentId) + if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then + rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) + local parentWaitKey = parentQueueKey .. ":wait" + local parentPausedKey = parentQueueKey .. ":paused" + local parentMetaKey = parentQueueKey .. ":meta" + + local parentMarkerKey = parentQueueKey .. ":marker" + local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") + local priority = tonumber(jobAttributes[1]) or 0 + local delay = tonumber(jobAttributes[2]) or 0 -local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp) - local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId) - if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then - rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) - local parentWaitKey = parentQueueKey .. ":wait" - local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey, - parentQueueKey .. ":paused") - local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") - local priority = tonumber(jobAttributes[1]) or 0 - local delay = tonumber(jobAttributes[2]) or 0 + if delay > 0 then + local delayedTimestamp = tonumber(timestamp) + delay + local score = delayedTimestamp * 0x1000 + local parentDelayedKey = parentQueueKey .. ":delayed" + rcall("ZADD", parentDelayedKey, score, parentId) + rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", + "jobId", parentId, "delay", delayedTimestamp) - if delay > 0 then - local delayedTimestamp = tonumber(timestamp) + delay - local score = delayedTimestamp * 0x1000 - local parentDelayedKey = parentQueueKey .. ":delayed" - rcall("ZADD", parentDelayedKey, score, parentId) - rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, - "delay", delayedTimestamp) + addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) + else + if priority == 0 then + local parentTarget, _paused = + getTargetQueueList(parentMetaKey, parentWaitKey, + parentPausedKey) + rcall("RPUSH", parentTarget, parentId) + rcall("ZADD", parentMarkerKey, 0, "0") + else + local isPaused = isQueuePaused(parentMetaKey) + addJobWithPriority(parentMarkerKey, + parentQueueKey .. ":prioritized", priority, + parentId, parentQueueKey .. ":pc", isPaused) + end - addDelayMarkerIfNeeded(parentTarget, parentDelayedKey) - else - if priority == 0 then - rcall("RPUSH", parentTarget, parentId) - else - addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused, - parentId, parentQueueKey .. ":pc") - end - - rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, - "prev", "waiting-children") + rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", + "jobId", parentId, "prev", "waiting-children") + end end - end end diff --git a/src/commands/includes/promoteDelayedJobs.lua b/src/commands/includes/promoteDelayedJobs.lua index ee9166c060..42d2068f69 100644 --- a/src/commands/includes/promoteDelayedJobs.lua +++ b/src/commands/includes/promoteDelayedJobs.lua @@ -10,8 +10,8 @@ --- @include "addJobWithPriority" -- Try to get as much as 1000 jobs at once -local function promoteDelayedJobs(delayedKey, waitKey, targetKey, prioritizedKey, - eventStreamKey, prefix, timestamp, paused, priorityCounterKey) +local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey, + eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused) local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000, "LIMIT", 0, 1000) if (#jobs > 0) then @@ -25,9 +25,12 @@ local function promoteDelayedJobs(delayedKey, waitKey, targetKey, prioritizedKey if priority == 0 then -- LIFO or FIFO rcall("LPUSH", targetKey, jobId) + if not isPaused then + rcall("ZADD", markerKey, 0, "0") + end else - addJobWithPriority(waitKey, prioritizedKey, priority, paused, - jobId, priorityCounterKey) + addJobWithPriority(markerKey, prioritizedKey, priority, + jobId, priorityCounterKey, isPaused) end -- Emit waiting event diff --git a/src/commands/moveToActive-10.lua b/src/commands/moveToActive-11.lua similarity index 66% rename from src/commands/moveToActive-10.lua rename to src/commands/moveToActive-11.lua index 1c93db0185..c8424377f6 100644 --- a/src/commands/moveToActive-10.lua +++ b/src/commands/moveToActive-11.lua @@ -17,16 +17,18 @@ KEYS[6] rate limiter key KEYS[7] delayed key - -- Promote delayed jobs + -- Delayed jobs KEYS[8] paused key KEYS[9] meta key KEYS[10] pc priority counter + -- Marker + KEYS[11] marker key + -- Arguments ARGV[1] key prefix ARGV[2] timestamp - ARGV[3] optional job ID - ARGV[4] opts + ARGV[3] opts opts - token - lock token opts - lockDuration @@ -37,7 +39,7 @@ local waitKey = KEYS[1] local activeKey = KEYS[2] local rateLimiterKey = KEYS[6] local delayedKey = KEYS[7] -local opts = cmsgpack.unpack(ARGV[4]) +local opts = cmsgpack.unpack(ARGV[3]) -- Includes --- @include "includes/getNextDelayedTimestamp" @@ -50,39 +52,26 @@ local opts = cmsgpack.unpack(ARGV[4]) local target, paused = getTargetQueueList(KEYS[9], waitKey, KEYS[8]) -- Check if there are delayed jobs that we can move to wait. -promoteDelayedJobs(delayedKey, waitKey, target, KEYS[3], KEYS[4], ARGV[1], - ARGV[2], paused, KEYS[10]) +local markerKey = KEYS[11] +promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], KEYS[4], ARGV[1], + ARGV[2], KEYS[10], paused) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) -local jobId = nil -if ARGV[3] ~= "" then - jobId = ARGV[3] - - -- clean stalled key - rcall("SREM", KEYS[5], jobId) -end - -if not jobId or (jobId and string.sub(jobId, 1, 2) == "0:") then - -- If jobId is special ID 0:delay, then there is no job to process - if jobId then rcall("LREM", activeKey, 1, jobId) end +-- Check if we are rate limited first. +if expireTime > 0 then return {0, 0, expireTime, 0} end - -- Check if we are rate limited first. - if expireTime > 0 then return {0, 0, expireTime, 0} end +-- paused queue +if paused then return {0, 0, 0, 0} end - -- paused queue - if paused then return {0, 0, 0, 0} end +-- no job ID, try non-blocking move from wait to active +local jobId = rcall("RPOPLPUSH", waitKey, activeKey) - -- no job ID, try non-blocking move from wait to active +-- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6. +if jobId and string.sub(jobId, 1, 2) == "0:" then + rcall("LREM", activeKey, 1, jobId) jobId = rcall("RPOPLPUSH", waitKey, activeKey) - - -- Since it is possible that between a call to BRPOPLPUSH and moveToActive - -- another script puts a new maker in wait, we need to check again. - if jobId and string.sub(jobId, 1, 2) == "0:" then - rcall("LREM", activeKey, 1, jobId) - jobId = rcall("RPOPLPUSH", waitKey, activeKey) - end end if jobId then diff --git a/src/commands/moveToDelayed-7.lua b/src/commands/moveToDelayed-7.lua new file mode 100644 index 0000000000..396147fc30 --- /dev/null +++ b/src/commands/moveToDelayed-7.lua @@ -0,0 +1,72 @@ +--[[ + Moves job from active to delayed set. + + Input: + KEYS[1] marker key + KEYS[2] active key + KEYS[3] prioritized key + KEYS[4] delayed key + KEYS[5] job key + KEYS[6] events stream + KEYS[7] meta key + + ARGV[1] key prefix + ARGV[2] timestamp + ARGV[3] delayedTimestamp + ARGV[4] the id of the job + ARGV[5] queue token + ARGV[6] delay value + + Output: + 0 - OK + -1 - Missing job. + -3 - Job not in active set. + + Events: + - delayed key. +]] +local rcall = redis.call + +-- Includes +--- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/isQueuePaused" + +local jobKey = KEYS[5] +local metaKey = KEYS[7] +if rcall("EXISTS", jobKey) == 1 then + local delayedKey = KEYS[4] + if ARGV[5] ~= "0" then + local lockKey = jobKey .. ':lock' + if rcall("GET", lockKey) == ARGV[5] then + rcall("DEL", lockKey) + else + return -2 + end + end + + local jobId = ARGV[4] + local score = tonumber(ARGV[3]) + local delayedTimestamp = (score / 0x1000) + + local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId) + if numRemovedElements < 1 then return -3 end + + rcall("HSET", jobKey, "delay", ARGV[6]) + + local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000 + + rcall("ZADD", delayedKey, score, jobId) + rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed", + "jobId", jobId, "delay", delayedTimestamp) + + -- Check if we need to push a marker job to wake up sleeping workers. + local isPaused = isQueuePaused(metaKey) + if not isPaused then + local markerKey = KEYS[1] + addDelayMarkerIfNeeded(markerKey, delayedKey) + end + + return 0 +else + return -1 +end diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-8.lua deleted file mode 100644 index 2a87b2aa0c..0000000000 --- a/src/commands/moveToDelayed-8.lua +++ /dev/null @@ -1,72 +0,0 @@ ---[[ - Moves job from active to delayed set. - - Input: - KEYS[1] wait key - KEYS[2] active key - KEYS[3] prioritized key - KEYS[4] delayed key - KEYS[5] job key - KEYS[6] events stream - KEYS[7] paused key - KEYS[8] meta key - - ARGV[1] key prefix - ARGV[2] timestamp - ARGV[3] delayedTimestamp - ARGV[4] the id of the job - ARGV[5] queue token - ARGV[6] delay value - - Output: - 0 - OK - -1 - Missing job. - -3 - Job not in active set. - - Events: - - delayed key. -]] -local rcall = redis.call - --- Includes ---- @include "includes/addDelayMarkerIfNeeded" ---- @include "includes/getTargetQueueList" ---- @include "includes/promoteDelayedJobs" - -local jobKey = KEYS[5] -if rcall("EXISTS", jobKey) == 1 then - local delayedKey = KEYS[4] - if ARGV[5] ~= "0" then - local lockKey = jobKey .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - else - return -2 - end - end - - local jobId = ARGV[4] - local score = tonumber(ARGV[3]) - local delayedTimestamp = (score / 0x1000) - - local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId) - if numRemovedElements < 1 then - return -3 - end - - rcall("HSET", jobKey, "delay", ARGV[6]) - - local maxEvents = rcall("HGET", KEYS[8], "opts.maxLenEvents") or 10000 - - rcall("ZADD", delayedKey, score, jobId) - rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed", - "jobId", jobId, "delay", delayedTimestamp) - - -- Check if we need to push a marker job to wake up sleeping workers. - local target = getTargetQueueList(KEYS[8], KEYS[1], KEYS[7]) - addDelayMarkerIfNeeded(target, delayedKey) - - return 0 -else - return -1 -end diff --git a/src/commands/moveToFinished-13.lua b/src/commands/moveToFinished-14.lua similarity index 90% rename from src/commands/moveToFinished-13.lua rename to src/commands/moveToFinished-14.lua index 9d924ccb38..716df550b8 100644 --- a/src/commands/moveToFinished-13.lua +++ b/src/commands/moveToFinished-14.lua @@ -22,6 +22,7 @@ KEYS[11] completed/failed key KEYS[12] jobId key KEYS[13] metrics key + KEYS[14] marker key ARGV[1] jobId ARGV[2] timestamp @@ -144,12 +145,13 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists else if opts['fpof'] then moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey, - parentId, jobIdKey, timestamp) + parentId, jobIdKey, + timestamp) elseif opts['rdof'] then local dependenciesSet = parentKey .. ":dependencies" if rcall("SREM", dependenciesSet, jobIdKey) == 1 then moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet, - parentKey, parentId, timestamp) + parentKey, parentId, timestamp) end end end @@ -202,8 +204,8 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local target, paused = getTargetQueueList(KEYS[9], KEYS[1], KEYS[8]) -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], KEYS[1], target, KEYS[3], - KEYS[4], ARGV[8], timestamp, paused, KEYS[10]) + promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], KEYS[4], ARGV[8], + timestamp, KEYS[10], paused) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) -- Check if we are rate limited first. @@ -217,25 +219,30 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) if jobId then + -- Markers in waitlist DEPRECATED in v5: Remove in v6. if string.sub(jobId, 1, 2) == "0:" then rcall("LREM", KEYS[2], 1, jobId) -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process -- but if ID is 0:0, then there is at least 1 prioritized job to process if jobId == "0:0" then - jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) - return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, - maxJobs, expireTime, opts) + jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], + KEYS[10]) + return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, + timestamp, maxJobs, + expireTime, opts) end else - return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs, - expireTime, opts) + return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, + timestamp, maxJobs, expireTime, + opts) end else jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) if jobId then - return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs, - expireTime, opts) + return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, + timestamp, maxJobs, expireTime, + opts) end end diff --git a/src/commands/pause-5.lua b/src/commands/pause-5.lua deleted file mode 100644 index b3f23025b7..0000000000 --- a/src/commands/pause-5.lua +++ /dev/null @@ -1,36 +0,0 @@ ---[[ - Pauses or resumes a queue globably. - - Input: - KEYS[1] 'wait' or 'paused'' - KEYS[2] 'paused' or 'wait' - KEYS[3] 'meta' - KEYS[4] 'prioritized' - KEYS[5] events stream key - - ARGV[1] 'paused' or 'resumed' - - Event: - publish paused or resumed event. -]] -local rcall = redis.call - --- Includes ---- @include "includes/addPriorityMarkerIfNeeded" - -if rcall("EXISTS", KEYS[1]) == 1 then - rcall("RENAME", KEYS[1], KEYS[2]) -end - -if ARGV[1] == "paused" then - rcall("HSET", KEYS[3], "paused", 1) -else - rcall("HDEL", KEYS[3], "paused") - local priorityCount = rcall("ZCARD", KEYS[4]) - - if priorityCount > 0 then - addPriorityMarkerIfNeeded(KEYS[2]) - end -end - -rcall("XADD", KEYS[5], "*", "event", ARGV[1]); diff --git a/src/commands/pause-7.lua b/src/commands/pause-7.lua new file mode 100644 index 0000000000..0e6421f638 --- /dev/null +++ b/src/commands/pause-7.lua @@ -0,0 +1,41 @@ +--[[ + Pauses or resumes a queue globably. + + Input: + KEYS[1] 'wait' or 'paused'' + KEYS[2] 'paused' or 'wait' + KEYS[3] 'meta' + KEYS[4] 'prioritized' + KEYS[5] events stream key + KEYS[6] 'delayed' + KEYS|7] 'marker' + + ARGV[1] 'paused' or 'resumed' + + Event: + publish paused or resumed event. +]] +local rcall = redis.call + +-- Includes +--- @include "includes/addDelayMarkerIfNeeded" + +local markerKey = KEYS[7] +local hasJobs = rcall("EXISTS", KEYS[1]) == 1 +if hasJobs then rcall("RENAME", KEYS[1], KEYS[2]) end + +if ARGV[1] == "paused" then + rcall("HSET", KEYS[3], "paused", 1) + rcall("DEL", markerKey) +else + rcall("HDEL", KEYS[3], "paused") + + if hasJobs or rcall("ZCARD", KEYS[4]) > 0 then + -- Add marker if there are waiting or priority jobs + rcall("ZADD", markerKey, 0, "0") + else + addDelayMarkerIfNeeded(markerKey, KEYS[6]) + end +end + +rcall("XADD", KEYS[5], "*", "event", ARGV[1]); diff --git a/src/commands/promote-7.lua b/src/commands/promote-7.lua deleted file mode 100644 index cd966e0011..0000000000 --- a/src/commands/promote-7.lua +++ /dev/null @@ -1,57 +0,0 @@ ---[[ - Promotes a job that is currently "delayed" to the "waiting" state - - Input: - KEYS[1] 'delayed' - KEYS[2] 'wait' - KEYS[3] 'paused' - KEYS[4] 'meta' - KEYS[5] 'prioritized' - KEYS[6] 'pc' priority counter - KEYS[7] 'event stream' - - ARGV[1] queue.toKey('') - ARGV[2] jobId - - Output: - 0 - OK - -3 - Job not in delayed zset. - - Events: - 'waiting' -]] -local rcall = redis.call -local jobId = ARGV[2] - --- Includes ---- @include "includes/addJobWithPriority" ---- @include "includes/getTargetQueueList" - -if rcall("ZREM", KEYS[1], jobId) == 1 then - local jobKey = ARGV[1] .. jobId - local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 - local target, paused = getTargetQueueList(KEYS[4], KEYS[2], KEYS[3]) - - -- Remove delayed "marker" from the wait list if there is any. - -- Since we are adding a job we do not need the marker anymore. - local marker = rcall("LINDEX", target, 0) - if marker and string.sub(marker, 1, 2) == "0:" then - rcall("LPOP", target) - end - - if priority == 0 then - -- LIFO or FIFO - rcall("LPUSH", target, jobId) - else - addJobWithPriority(KEYS[2], KEYS[5], priority, paused, jobId, KEYS[6]) - end - - -- Emit waiting event (wait..ing@token) - rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId, "prev", "delayed"); - - rcall("HSET", jobKey, "delay", 0) - - return 0 -else - return -3 -end \ No newline at end of file diff --git a/src/commands/promote-8.lua b/src/commands/promote-8.lua new file mode 100644 index 0000000000..d9a6c67073 --- /dev/null +++ b/src/commands/promote-8.lua @@ -0,0 +1,60 @@ +--[[ + Promotes a job that is currently "delayed" to the "waiting" state + + Input: + KEYS[1] 'delayed' + KEYS[2] 'wait' + KEYS[3] 'paused' + KEYS[4] 'meta' + KEYS[5] 'prioritized' + KEYS[6] 'pc' priority counter + KEYS[7] 'event stream' + KEYS[8] 'marker' + + ARGV[1] queue.toKey('') + ARGV[2] jobId + + Output: + 0 - OK + -3 - Job not in delayed zset. + + Events: + 'waiting' +]] +local rcall = redis.call +local jobId = ARGV[2] + +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + +if rcall("ZREM", KEYS[1], jobId) == 1 then + local jobKey = ARGV[1] .. jobId + local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 + local metaKey = KEYS[4] + + -- Remove delayed "marker" from the wait list if there is any. + -- Since we are adding a job we do not need the marker anymore. + -- Markers in waitlist DEPRECATED in v5: Remove in v6. + local target, paused = getTargetQueueList(metaKey, KEYS[2], KEYS[3]) + local marker = rcall("LINDEX", target, 0) + if marker and string.sub(marker, 1, 2) == "0:" then rcall("LPOP", target) end + + if priority == 0 then + -- LIFO or FIFO + rcall("LPUSH", target, jobId) + if not paused then rcall("ZADD", KEYS[8], 0, "0") end + else + addJobWithPriority(KEYS[8], KEYS[5], priority, jobId, KEYS[6], paused) + end + + -- Emit waiting event (wait..ing@token) + rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId, "prev", + "delayed"); + + rcall("HSET", jobKey, "delay", 0) + + return 0 +else + return -3 +end diff --git a/src/commands/retryJob-9.lua b/src/commands/retryJob-10.lua similarity index 87% rename from src/commands/retryJob-9.lua rename to src/commands/retryJob-10.lua index 5711f17b9a..3508690dfb 100644 --- a/src/commands/retryJob-9.lua +++ b/src/commands/retryJob-10.lua @@ -11,6 +11,7 @@ KEYS[7] delayed key KEYS[8] prioritized key KEYS[9] 'pc' priority counter + KEYS[10] 'marker' ARGV[1] key prefix ARGV[2] timestamp @@ -34,9 +35,11 @@ local rcall = redis.call --- @include "includes/promoteDelayedJobs" local target, paused = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3]) +local markerKey = KEYS[10] + -- Check if there are delayed jobs that we can move to wait. -- test example: when there are delayed jobs between retries -promoteDelayedJobs(KEYS[7], KEYS[2], target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], paused, KEYS[9]) +promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused) if rcall("EXISTS", KEYS[4]) == 1 then @@ -57,7 +60,7 @@ if rcall("EXISTS", KEYS[4]) == 1 then if priority == 0 then rcall(ARGV[3], target, ARGV[4]) else - addJobWithPriority(KEYS[2], KEYS[8], priority, paused, ARGV[4], KEYS[9]) + addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], paused) end local maxEvents = rcall("HGET", KEYS[5], "opts.maxLenEvents") or 10000 diff --git a/tests/test_clean.ts b/tests/test_clean.ts index 90274c557d..15050437f7 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -77,7 +77,7 @@ describe('Cleaner', () => { ); }); - await queue.addBulk([ + const addedJobs = await queue.addBulk([ { name: 'test', data: { some: 'data' } }, { name: 'test', data: { some: 'data' } }, ]); @@ -269,7 +269,11 @@ describe('Cleaner', () => { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['meta', 'events', 'marker', 'id']).to.include(type); + } const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); @@ -309,7 +313,6 @@ describe('Cleaner', () => { { name, data: { idx: 2, foo: 'qux' }, queueName }, ], }); - await completing; await delay(100); await queue.clean(0, 0, 'completed'); @@ -319,6 +322,12 @@ describe('Cleaner', () => { // Expected keys: meta, id, stalled-check and events expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['meta', 'id', 'stalled-check', 'events']).to.include( + type, + ); + } const jobs = await queue.getJobCountByTypes('completed'); expect(jobs).to.be.equal(0); @@ -365,8 +374,21 @@ describe('Cleaner', () => { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); + // Expected keys: meta, id, stalled-check, events, failed and job - expect(keys.length).to.be.eql(7); + expect(keys.length).to.be.eql(8); + for (const key of keys) { + const type = key.split(':')[2]; + expect([ + 'meta', + 'id', + 'stalled-check', + 'events', + 'failed', + 'marker', + tree.job.id!, + ]).to.include(type); + } const parentState = await tree.job.getState(); expect(parentState).to.be.equal('failed'); @@ -615,7 +637,11 @@ describe('Cleaner', () => { const client = await queue.client; const keys = await client.keys(`${prefix}:${queueName}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['meta', 'events', 'marker', 'id']).to.include(type); + } const eventsCount = await client.xlen( `${prefix}:${parentQueueName}:events`, diff --git a/tests/test_events.ts b/tests/test_events.ts index 122221e14d..f0c6377c92 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -440,22 +440,30 @@ describe('events', function () { prefix, }, ); - const waitingChildren = new Promise(resolve => { + const waitingChildren = new Promise((resolve, reject) => { queueEvents.once('waiting-children', async ({ jobId }) => { - const job = await queue.getJob(jobId); - const state = await job.getState(); - expect(state).to.be.equal('waiting-children'); - expect(job.name).to.be.equal(name); - resolve(); + try { + const job = await queue.getJob(jobId); + const state = await job.getState(); + expect(state).to.be.equal('waiting-children'); + expect(job.name).to.be.equal(name); + resolve(); + } catch (err) { + reject(err); + } }); }); - const waiting = new Promise(resolve => { + const waiting = new Promise((resolve, reject) => { queueEvents.on('waiting', async ({ jobId, prev }) => { - const job = await queue.getJob(jobId); - expect(prev).to.be.equal('waiting-children'); - if (job.name === name) { - resolve(); + try { + const job = await queue.getJob(jobId); + expect(prev).to.be.equal('waiting-children'); + if (job.name === name) { + resolve(); + } + } catch (err) { + reject(err); } }); }); diff --git a/tests/test_job.ts b/tests/test_job.ts index 232de72f21..3dd653af28 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -1096,12 +1096,16 @@ describe('Job', function () { ); await worker.waitUntilReady(); - const completing = new Promise(resolve => { + const completing = new Promise((resolve, reject) => { worker.on( 'completed', after(4, () => { - expect(completed).to.be.eql(['a', 'b', 'c', 'd']); - resolve(); + try { + expect(completed).to.be.eql(['a', 'b', 'c', 'd']); + resolve(); + } catch (err) { + reject(err); + } }), ); }); diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 7ce3c1752d..d765f57a8f 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -80,8 +80,12 @@ describe('queues', function () { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); + expect(keys.length).to.be.eql(5); - expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['marker', 'events', 'meta', 'pc', 'id']).to.include(type); + } }).timeout(10000); describe('when having a flow', async () => { @@ -111,7 +115,11 @@ describe('queues', function () { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['events', 'meta', 'id', 'marker']).to.include(type); + } const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); @@ -141,7 +149,11 @@ describe('queues', function () { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['id', 'meta', 'marker', 'events']).to.include(type); + } const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); @@ -225,7 +237,11 @@ describe('queues', function () { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['id', 'meta', 'events', 'marker']).to.include(type); + } const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); @@ -272,7 +288,11 @@ describe('queues', function () { const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); - expect(keys.length).to.be.eql(3); + expect(keys.length).to.be.eql(4); + for (const key of keys) { + const type = key.split(':')[2]; + expect(['id', 'meta', 'events', 'marker']).to.include(type); + } const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 7396bd1ebb..d6aa60dfd9 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -150,10 +150,13 @@ describe('Rate Limiter', function () { duration: 2000, }, }; - const worker1 = new Worker(queueName, async () => {}, commontOpts); - const worker2 = new Worker(queueName, async () => {}, commontOpts); - const worker3 = new Worker(queueName, async () => {}, commontOpts); - const worker4 = new Worker(queueName, async () => {}, commontOpts); + + const processor = async () => {}; + + const worker1 = new Worker(queueName, processor, commontOpts); + const worker2 = new Worker(queueName, processor, commontOpts); + const worker3 = new Worker(queueName, processor, commontOpts); + const worker4 = new Worker(queueName, processor, commontOpts); const result = new Promise((resolve, reject) => { queueEvents.once('completed', async () => { @@ -165,7 +168,7 @@ describe('Rate Limiter', function () { }); }); - await delay(500); + await delay(100); const jobs = Array.from(Array(numJobs).keys()).map(() => ({ name: 'rate test', @@ -173,6 +176,8 @@ describe('Rate Limiter', function () { })); await queue.addBulk(jobs); + await delay(100); + await queue.pause(); await result; diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 44e9bbfa2c..73275d8162 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -465,9 +465,9 @@ describe('workers', function () { await worker.close(); }).timeout(8000); - it('do not call moveToActive more than concurrency factor', async () => { - const numJobs = 50; - const concurrency = 10; + it('do not call moveToActive more than concurrency factor + 1', async () => { + const numJobs = 57; + const concurrency = 13; let completedJobs = 0; const worker = new Worker( queueName, @@ -481,6 +481,10 @@ describe('workers', function () { // Add spy to worker.moveToActive const spy = sinon.spy(worker, 'moveToActive'); + const bclientSpy = sinon.spy( + await worker.blockingConnection.client, + 'bzpopmin', + ); for (let i = 0; i < numJobs; i++) { const job = await queue.add('test', { foo: 'bar' }); @@ -488,8 +492,10 @@ describe('workers', function () { expect(job.data.foo).to.be.eql('bar'); } + expect(bclientSpy.callCount).to.be.equal(1); + await new Promise((resolve, reject) => { - worker.on('completed', (job: Job, result: any) => { + worker.on('completed', (_job: Job, _result: any) => { completedJobs++; if (completedJobs == numJobs) { resolve(); @@ -498,7 +504,8 @@ describe('workers', function () { }); // Check moveToActive was called only concurrency times - expect(spy.callCount).to.be.equal(concurrency); + expect(spy.callCount).to.be.equal(concurrency + 1); + expect(bclientSpy.callCount).to.be.equal(3); await worker.close(); }); @@ -518,6 +525,10 @@ describe('workers', function () { // Add spy to worker.moveToActive const spy = sinon.spy(worker, 'moveToActive'); + const bclientSpy = sinon.spy( + await worker.blockingConnection.client, + 'bzpopmin', + ); for (let i = 0; i < numJobs; i++) { const job = await queue.add('test', { foo: 'bar' }); @@ -525,18 +536,20 @@ describe('workers', function () { expect(job.data.foo).to.be.eql('bar'); } + expect(bclientSpy.callCount).to.be.equal(1); + await new Promise((resolve, reject) => { worker.on('completed', (job: Job, result: any) => { completedJobs++; - console.log(completedJobs); if (completedJobs == numJobs) { resolve(); } }); }); - // Check moveToActive was called numJobs + 1 times - expect(spy.callCount).to.be.equal(numJobs + 1); + // Check moveToActive was called numJobs + 2 times + expect(spy.callCount).to.be.equal(numJobs + 2); + expect(bclientSpy.callCount).to.be.equal(3); await worker.close(); }); @@ -601,13 +614,17 @@ describe('workers', function () { expect(job.id).to.be.ok; expect(job.data.foo).to.be.eql('bar'); - const failing = new Promise(resolve => { + const failing = new Promise((resolve, reject) => { worker.once('failed', async (job, err) => { - expect(job).to.be.ok; - expect(job.finishedOn).to.be.a('number'); - expect(job.data.foo).to.be.eql('bar'); - expect(err).to.be.eql(jobError); - resolve(); + try { + expect(job).to.be.ok; + expect(job!.finishedOn).to.be.a('number'); + expect(job!.data.foo).to.be.eql('bar'); + expect(err).to.be.eql(jobError); + resolve(); + } catch (err) { + reject(err); + } }); }); @@ -828,12 +845,16 @@ describe('workers', function () { /* Try to gracefully close while having a job that will be completed running */ worker.close(); - await new Promise(resolve => { + await new Promise((resolve, reject) => { worker.once('completed', async job => { - expect(job).to.be.ok; - expect(job.finishedOn).to.be.a('number'); - expect(job.data.foo).to.be.eql('bar'); - resolve(); + try { + expect(job).to.be.ok; + expect(job.finishedOn).to.be.a('number'); + expect(job.data.foo).to.be.eql('bar'); + resolve(); + } catch (err) { + reject(err); + } }); });