Skip to content

Commit

Permalink
perf(clean-jobs-in-set): use ZRANGEBYSCORE when limit > 0 (taskforces…
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 29, 2022
1 parent 9c14458 commit f0d9985
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
14 changes: 14 additions & 0 deletions docs/gitbook/bullmq-pro/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## [2.3.7](https://github.com/taskforcesh/bullmq-pro/compare/v2.3.6...v2.3.7) (2022-07-28)


### Bug Fixes

* **deps:** upgrade bullmq to 1.86.9 ([#73](https://github.com/taskforcesh/bullmq-pro/issues/73)) ([bbc0784](https://github.com/taskforcesh/bullmq-pro/commit/bbc07845f6cce0cc003681255b892330c729b30e))

## [2.3.6](https://github.com/taskforcesh/bullmq-pro/compare/v2.3.5...v2.3.6) (2022-07-26)


### Performance Improvements

* **retry-jobs:** add jobs in batches when groupId is present ([#72](https://github.com/taskforcesh/bullmq-pro/issues/72)) ([3961da0](https://github.com/taskforcesh/bullmq-pro/commit/3961da022843048597033e8f13034f245198bca3))

## [2.3.5](https://github.com/taskforcesh/bullmq-pro/compare/v2.3.4...v2.3.5) (2022-07-20)


Expand Down
4 changes: 2 additions & 2 deletions src/commands/cleanJobsInSet-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ local result
if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
elseif ARGV[4] == "delayed" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], {"processedOn", "timestamp"})
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"})
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
else
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], {"finishedOn"} )
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} )
end

rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
Expand Down
16 changes: 14 additions & 2 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,20 @@
--- @include "getTimestamp"
--- @include "removeJob"

local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, attributes)
local jobs = rcall("ZRANGE", setKey, rangeStart, rangeEnd)
-- We use ZRANGEBYSCORE to make the case where we're deleting a limited number
-- of items in a sorted set only run a single iteration. If we simply used
-- ZRANGE, we may take a long time traversing through jobs that are within the
-- grace period.
local function getJobs(setKey, rangeStart, rangeEnd, maxTimestamp, limit)
if limit > 0 then
return rcall("ZRANGEBYSCORE", setKey, 0, maxTimestamp, "LIMIT", 0, limit)
else
return rcall("ZRANGE", setKey, rangeStart, rangeEnd)
end
end

local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes)
local jobs = getJobs(setKey, rangeStart, rangeEnd, timestamp, limit)
local deleted = {}
local deletedCount = 0
local jobTS
Expand Down
10 changes: 5 additions & 5 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
Validate and move or add dependencies to parent.
]]

-- Includes
--- @include "getTargetQueueList"

local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
parentId, jobIdKey, returnvalue )
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
if rcall("HEXISTS", parentQueueKey .. ":meta", "paused") ~= 1 then
rcall("RPUSH", parentQueueKey .. ":wait", parentId)
else
rcall("RPUSH", parentQueueKey .. ":paused", parentId)
end
local parentTarget = getTargetQueueList(parentQueueKey .. ":meta", parentQueueKey .. ":wait", parentQueueKey .. ":paused")
rcall("RPUSH", parentTarget, parentId)

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
end
Expand Down

0 comments on commit f0d9985

Please sign in to comment.