Skip to content

Commit

Permalink
perf(move-to-active): check rate limited once (#2391)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jan 23, 2024
1 parent 7db6756 commit ca6c17a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 55 deletions.
42 changes: 3 additions & 39 deletions src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,17 @@
--[[
Function to move job from wait state to active.
Input:
keys[1] wait key
keys[2] active key
keys[3] prioritized key
keys[4] stream events key
keys[5] stalled key
-- Rate limiting
keys[6] rate limiter key
keys[7] delayed key
keys[8] paused key
keys[9] meta key
keys[10] pc priority counter
opts - token - lock token
opts - lockDuration
opts - limiter
]]

-- Includes
--- @include "pushBackJobWithPriority"

local function prepareJobForProcessing(keys, keyPrefix, targetKey, jobId, processedOn,
maxJobs, expireTime, opts)
local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
jobId, processedOn, maxJobs, opts)
local jobKey = keyPrefix .. jobId

-- Check if we need to perform rate limiting.
if maxJobs then
local rateLimiterKey = keys[6];

-- check if we exceeded rate limit, we need to remove the job and return expireTime
if expireTime > 0 then
-- remove from active queue and add back to the wait list
rcall("LREM", keys[2], 1, jobId)

local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0

if priority == 0 then
rcall("RPUSH", targetKey, jobId)
else
pushBackJobWithPriority(keys[3], priority, jobId)
end

-- Return when we can process more jobs
return {0, 0, expireTime, 0}
end

local jobCounter = tonumber(rcall("INCR", rateLimiterKey))

if jobCounter == 1 then
Expand All @@ -65,7 +29,7 @@ local function prepareJobForProcessing(keys, keyPrefix, targetKey, jobId, proces
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end

rcall("XADD", keys[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "ats", 1)

Expand Down
11 changes: 6 additions & 5 deletions src/commands/moveToActive-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
local rcall = redis.call
local waitKey = KEYS[1]
local activeKey = KEYS[2]
local eventStreamKey = KEYS[4]
local rateLimiterKey = KEYS[6]
local delayedKey = KEYS[7]
local opts = cmsgpack.unpack(ARGV[3])
Expand All @@ -53,7 +54,7 @@ local target, paused = getTargetQueueList(KEYS[9], waitKey, KEYS[8])

-- Check if there are delayed jobs that we can move to wait.
local markerKey = KEYS[11]
promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], KEYS[4], ARGV[1],
promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
ARGV[2], KEYS[10], paused)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
Expand All @@ -75,13 +76,13 @@ if jobId and string.sub(jobId, 1, 2) == "0:" then
end

if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2],
maxJobs, expireTime, opts)
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2],
maxJobs, expireTime, opts)
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
end
end

Expand Down
23 changes: 12 additions & 11 deletions src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

if (numRemovedElements < 1) then return -3 end

local eventStreamKey = KEYS[4]
local metaKey = KEYS[9]
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(metaKey, KEYS[4])
trimEvents(metaKey, eventStreamKey)

-- If job has a parent we need to
-- 1) remove this job id from parents dependencies
Expand Down Expand Up @@ -183,12 +184,12 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
end
end

rcall("XADD", KEYS[4], "*", "event", ARGV[5], "jobId", jobId, ARGV[3],
rcall("XADD", eventStreamKey, "*", "event", ARGV[5], "jobId", jobId, ARGV[3],
ARGV[4])

if ARGV[5] == "failed" then
if tonumber(attemptsMade) >= tonumber(attempts) then
rcall("XADD", KEYS[4], "*", "event", "retries-exhausted", "jobId",
rcall("XADD", eventStreamKey, "*", "event", "retries-exhausted", "jobId",
jobId, "attemptsMade", attemptsMade)
end
end
Expand All @@ -205,7 +206,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[8])

-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], KEYS[4], ARGV[7],
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, ARGV[7],
timestamp, KEYS[10], paused)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
Expand All @@ -229,20 +230,20 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
if jobId == "0:0" then
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2],
KEYS[10])
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
expireTime, opts)
opts)
end
else
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
timestamp, maxJobs, expireTime,
return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
opts)
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[7], target, jobId,
timestamp, maxJobs, expireTime,
return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
opts)
end
end
Expand All @@ -264,7 +265,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local prioritizedLen = rcall("ZCARD", KEYS[3])

if prioritizedLen == 0 then
rcall("XADD", KEYS[4], "*", "event", "drained")
rcall("XADD", eventStreamKey, "*", "event", "drained")
end
end
end
Expand Down

0 comments on commit ca6c17a

Please sign in to comment.