From 4914df87e416711835291e81da93b279bd758254 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sat, 20 Jan 2024 12:00:19 -0500 Subject: [PATCH] fix(stalled): consider adding marker when moving job back to wait (#2384) --- python/bullmq/scripts.py | 4 ++-- src/classes/scripts.ts | 1 + src/commands/addStandardJob-7.lua | 8 ++------ src/commands/includes/addBaseMarkerIfNeeded.lua | 9 +++++++++ src/commands/includes/addJobInTargetList.lua | 11 +++++++++++ src/commands/includes/addJobWithPriority.lua | 7 ++++--- src/commands/includes/moveParentToWaitIfNeeded.lua | 7 ++++--- src/commands/includes/promoteDelayedJobs.lua | 6 ++---- src/commands/moveJobsToWait-7.lua | 5 ++--- ...obsToWait-8.lua => moveStalledJobsToWait-9.lua} | 14 ++++++++++---- src/commands/promote-8.lua | 4 ++-- tests/test_stalled_jobs.ts | 4 ++-- 12 files changed, 51 insertions(+), 29 deletions(-) create mode 100644 src/commands/includes/addBaseMarkerIfNeeded.lua create mode 100644 src/commands/includes/addJobInTargetList.lua rename src/commands/{moveStalledJobsToWait-8.lua => moveStalledJobsToWait-9.lua} (94%) diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index d45cf099bf..57e9ce012d 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -43,7 +43,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "getState": self.redisClient.register_script(self.getScript("getState-8.lua")), "getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), "isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), - "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), + "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")), "moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")), "moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")), "moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")), @@ -554,7 +554,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): keys = self.getKeys(['stalled', 'wait', 'active', 'failed', - 'stalled-check', 'meta', 'paused', 'events']) + 'stalled-check', 'meta', 'paused', 'marker', 'events']) args = [maxStalledCount, self.keys[''], round( time.time() * 1000), stalledInterval] return self.commands["moveStalledJobsToWait"](keys, args) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 23e762bb1f..c8378dc775 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -1000,6 +1000,7 @@ export class Scripts { this.queue.keys['stalled-check'], this.queue.keys.meta, this.queue.keys.paused, + this.queue.keys.marker, this.queue.keys.events, ]; const args = [ diff --git a/src/commands/addStandardJob-7.lua b/src/commands/addStandardJob-7.lua index 44fe70b583..1f01c7bc31 100644 --- a/src/commands/addStandardJob-7.lua +++ b/src/commands/addStandardJob-7.lua @@ -58,6 +58,7 @@ local parent = args[8] local parentData -- Includes +--- @include "includes/addJobInTargetList" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" --- @include "includes/storeJob" @@ -100,14 +101,9 @@ storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) -if not paused then - -- mark that a job is available - rcall("ZADD", KEYS[7], 0, "0") -end - -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -rcall(pushCmd, target, jobId) +addJobInTargetList(target, KEYS[7], pushCmd, paused, jobId) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua new file mode 100644 index 0000000000..573d3a6a19 --- /dev/null +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -0,0 +1,9 @@ +--[[ + Add marker if needed when a job is available. +]] + +local function addBaseMarkerIfNeeded(markerKey, isPaused) + if not isPaused then + rcall("ZADD", markerKey, 0, "0") + end +end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua new file mode 100644 index 0000000000..387629b756 --- /dev/null +++ b/src/commands/includes/addJobInTargetList.lua @@ -0,0 +1,11 @@ +--[[ + Function to add job in target list and add marker if needed. +]] + +-- Includes +--- @include "addBaseMarkerIfNeeded" + +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId) + rcall(pushCmd, targetKey, jobId) + addBaseMarkerIfNeeded(markerKey, isPaused) +end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index f038992b0b..639b1efc2d 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -2,11 +2,12 @@ Function to add job considering priority. ]] +-- Includes +--- @include "addBaseMarkerIfNeeded" + local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) rcall("ZADD", prioritizedKey, score, jobId) - if not isPaused then - rcall("ZADD", markerKey, 0, "0") - end + addBaseMarkerIfNeeded(markerKey, isPaused) end diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 3b1967e6af..35673b7a2e 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -4,9 +4,11 @@ -- Includes --- @include "addDelayMarkerIfNeeded" +--- @include "addJobInTargetList" --- @include "addJobWithPriority" --- @include "isQueuePaused" --- @include "getTargetQueueList" + local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp) local isParentActive = rcall("ZSCORE", @@ -33,11 +35,10 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) else if priority == 0 then - local parentTarget, _paused = + local parentTarget, isParentPaused = getTargetQueueList(parentMetaKey, parentWaitKey, parentPausedKey) - rcall("RPUSH", parentTarget, parentId) - rcall("ZADD", parentMarkerKey, 0, "0") + addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPaused, parentId) else local isPaused = isQueuePaused(parentMetaKey) addJobWithPriority(parentMarkerKey, diff --git a/src/commands/includes/promoteDelayedJobs.lua b/src/commands/includes/promoteDelayedJobs.lua index 42d2068f69..be2922dd04 100644 --- a/src/commands/includes/promoteDelayedJobs.lua +++ b/src/commands/includes/promoteDelayedJobs.lua @@ -7,6 +7,7 @@ ]] -- Includes +--- @include "addJobInTargetList" --- @include "addJobWithPriority" -- Try to get as much as 1000 jobs at once @@ -24,10 +25,7 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK if priority == 0 then -- LIFO or FIFO - rcall("LPUSH", targetKey, jobId) - if not isPaused then - rcall("ZADD", markerKey, 0, "0") - end + addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId) else addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) diff --git a/src/commands/moveJobsToWait-7.lua b/src/commands/moveJobsToWait-7.lua index 21f09e718a..626cc9b9ad 100644 --- a/src/commands/moveJobsToWait-7.lua +++ b/src/commands/moveJobsToWait-7.lua @@ -26,6 +26,7 @@ local timestamp = tonumber(ARGV[2]) local rcall = redis.call; -- Includes +--- @include "includes/addBaseMarkerIfNeeded" --- @include "includes/batches" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" @@ -61,9 +62,7 @@ if (#jobs > 0) then rcall("LPUSH", target, unpack(jobs, from, to)) end - if not paused then - rcall("ZADD", KEYS[7], 0, "0") - end + addBaseMarkerIfNeeded(KEYS[7], paused) end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-8.lua b/src/commands/moveStalledJobsToWait-9.lua similarity index 94% rename from src/commands/moveStalledJobsToWait-8.lua rename to src/commands/moveStalledJobsToWait-9.lua index 0d61522f4a..017754a734 100644 --- a/src/commands/moveStalledJobsToWait-8.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -1,5 +1,6 @@ --[[ Move stalled jobs to wait. + Input: KEYS[1] 'stalled' (SET) KEYS[2] 'wait', (LIST) @@ -8,12 +9,14 @@ KEYS[5] 'stalled-check', (KEY) KEYS[6] 'meta', (KEY) KEYS[7] 'paused', (LIST) - KEYS[8] 'event stream' (STREAM) + KEYS[8] 'marker' + KEYS[9] 'event stream' (STREAM) ARGV[1] Max stalled job count ARGV[2] queue.toKey('') ARGV[3] timestamp ARGV[4] max check time + Events: 'stalled' with stalled job id. ]] @@ -21,6 +24,7 @@ local rcall = redis.call -- Includes +--- @include "includes/addJobInTargetList" --- @include "includes/batches" --- @include "includes/getTargetQueueList" --- @include "includes/removeJob" @@ -35,7 +39,8 @@ local failedKey = KEYS[4] local stalledCheckKey = KEYS[5] local metaKey = KEYS[6] local pausedKey = KEYS[7] -local eventStreamKey = KEYS[8] +local markerKey = KEYS[8] +local eventStreamKey = KEYS[9] local maxStalledJobCount = ARGV[1] local queueKeyPrefix = ARGV[2] local timestamp = ARGV[3] @@ -113,11 +118,12 @@ if (#stalling > 0) then table.insert(failed, jobId) else - local target = + local target, isPaused= getTargetQueueList(metaKey, waitKey, pausedKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - rcall("RPUSH", target, jobId) + addJobInTargetList(target, markerKey, "RPUSH", isPaused, jobId) + rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active') diff --git a/src/commands/promote-8.lua b/src/commands/promote-8.lua index d9a6c67073..e8b816383b 100644 --- a/src/commands/promote-8.lua +++ b/src/commands/promote-8.lua @@ -25,6 +25,7 @@ local rcall = redis.call local jobId = ARGV[2] -- Includes +--- @include "includes/addJobInTargetList" --- @include "includes/addJobWithPriority" --- @include "includes/getTargetQueueList" @@ -42,8 +43,7 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then if priority == 0 then -- LIFO or FIFO - rcall("LPUSH", target, jobId) - if not paused then rcall("ZADD", KEYS[8], 0, "0") end + addJobInTargetList(target, KEYS[8], "LPUSH", paused, jobId) else addJobWithPriority(KEYS[8], KEYS[5], priority, jobId, KEYS[6], paused) end diff --git a/tests/test_stalled_jobs.ts b/tests/test_stalled_jobs.ts index 52d641579c..b347025bab 100644 --- a/tests/test_stalled_jobs.ts +++ b/tests/test_stalled_jobs.ts @@ -32,7 +32,7 @@ describe('stalled jobs', function () { }); it('process stalled jobs when starting a queue', async function () { - this.timeout(10000); + this.timeout(5000); const queueEvents = new QueueEvents(queueName, { connection, prefix }); await queueEvents.waitUntilReady(); @@ -298,7 +298,7 @@ describe('stalled jobs', function () { }); it('moves jobs to failed with maxStalledCount > 1', async function () { - this.timeout(60000); + this.timeout(8000); const queueEvents = new QueueEvents(queueName, { connection, prefix }); await queueEvents.waitUntilReady();