Skip to content

Commit

Permalink
fix(remove-job): consider removing parent dependency key in lua scrip…
Browse files Browse the repository at this point in the history
…ts (#990)
  • Loading branch information
roggervalf authored Jan 14, 2022
1 parent aea20b4 commit 661abf0
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 31 deletions.
30 changes: 30 additions & 0 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
--[[
Check if this job has a parent. If so we will just remove it from
the parent child list, but if it is the last child we should move the parent to "wait/paused"
which requires code from "moveToFinished"
]]

--- @include "destructureJobKey"

local function removeParentDependencyKey(jobKey)
local parentKey = rcall("HGET", jobKey, "parentKey")
if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)

rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
end
end
2 changes: 2 additions & 0 deletions src/commands/moveToFinished-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/updateParentDepsIfNeeded"
--- @include "includes/destructureJobKey"
--- @include "includes/removeParentDependencyKey"

local jobIdKey = KEYS[3]
if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
Expand Down Expand Up @@ -114,6 +115,7 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1)
for i, jobId in ipairs(jobIds) do
local jobKey = ARGV[9] .. jobId
removeParentDependencyKey(jobKey)
local jobLogKey = jobKey .. ':logs'
local jobProcessedKey = jobKey .. ':processed'
rcall("DEL", jobKey, jobLogKey, jobProcessedKey)
Expand Down
17 changes: 10 additions & 7 deletions src/commands/obliterate-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ local function getSetItems(keyName, max)
return rcall('SMEMBERS', keyName, 0, max)
end

--- @include "includes/removeParentDependencyKey"

local function removeJobs(keys)
for i, key in ipairs(keys) do
local jobKey = baseKey .. key
local jobKey = baseKey .. key
removeParentDependencyKey(jobKey)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
rcall("DEL", jobKey .. ':dependencies')
Expand Down Expand Up @@ -84,12 +87,6 @@ if(maxCount <= 0) then
return 1
end

local waitKey = baseKey .. 'paused'
removeListJobs(waitKey, maxCount)
if(maxCount <= 0) then
return 1
end

local delayedKey = baseKey .. 'delayed'
removeZSetJobs(delayedKey, maxCount)
if(maxCount <= 0) then
Expand All @@ -108,6 +105,12 @@ if(maxCount <= 0) then
return 1
end

local waitKey = baseKey .. 'paused'
removeListJobs(waitKey, maxCount)
if(maxCount <= 0) then
return 1
end

local waitingChildrenKey = baseKey .. 'waiting-children'
removeZSetJobs(waitingChildrenKey, maxCount)
if(maxCount <= 0) then
Expand Down
25 changes: 2 additions & 23 deletions src/commands/removeJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local rcall = redis.call

-- Includes
--- @include "includes/destructureJobKey"
--- @include "includes/removeParentDependencyKey"

-- recursively check if there are no locks on the
-- jobs to be removed.
Expand Down Expand Up @@ -44,29 +45,7 @@ end
local function removeJob( prefix, jobId)
local jobKey = prefix .. jobId;

-- Check if this job has a parent. If so we will just remove it from
-- the parent child list, but if it is the last child we should move the parent to "wait/paused"
-- which requires code from "moveToFinished"
local parentKey = rcall("HGET", jobKey, "parentKey")
if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)

rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "parentPrefixpaused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
end
removeParentDependencyKey(jobKey)

rcall("LREM", prefix .. "active", 0, jobId)
rcall("LREM", prefix .. "wait", 0, jobId)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_obliterate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe('Obliterate', function () {
const flow = new FlowProducer({ connection });
await flow.add({
name: 'parent-job',
queueName: queueName,
queueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
Expand Down

0 comments on commit 661abf0

Please sign in to comment.