Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions lib/commands/moveToActive-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
KEYS[1] wait key
KEYS[2] active key
KEYS[3] priority key
KEYS[4] active event key
KEYS[4] active key
KEYS[5] stalled key

-- Rate limiting
Expand All @@ -34,12 +34,11 @@

local rcall = redis.call

local rateLimit = function(jobId, maxJobs)
local rateLimiterKey = KEYS[6];
local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit)
local limiterIndexTable = rateLimiterKey .. ":index"

-- Rate limit by group?
if(ARGV[9]) then
if(groupLimit) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
Expand Down Expand Up @@ -68,7 +67,7 @@ local rateLimit = function(jobId, maxJobs)

if numLimitedJobs > 0 then
-- Note, add some slack to compensate for drift.
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
delay = ((numLimitedJobs * timeUnit * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
end
end

Expand All @@ -80,31 +79,29 @@ local rateLimit = function(jobId, maxJobs)
if (delay == 0) and (jobCounter >= maxJobs) then
-- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for this unit of time so we need to rate limit this job.
local exceedingJobs = jobCounter - maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * ARGV[7]) / maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * timeUnit) / maxJobs
end

if delay > 0 then
local bounceBack = ARGV[8]
if bounceBack == 'false' then
local timestamp = delay + tonumber(ARGV[4])
local timestamp = delay + tonumber(timestamp)
-- put job into delayed queue
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", KEYS[7], timestamp)
rcall("ZADD", delayedKey, timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", delayedKey, timestamp)
rcall("SADD", limitedSetKey, jobId)

-- store index so that we can delete rate limited data
rcall("HSET", limiterIndexTable, jobId, limitedSetKey)

end

-- remove from active queue
rcall("LREM", KEYS[2], 1, jobId)
rcall("LREM", activeKey, 1, jobId)
return true
else
-- false indicates not rate limited
-- increment jobCounter only when a job is not rate limited
if (jobCounter == 0) then
rcall("PSETEX", rateLimiterKey, ARGV[7], 1)
rcall("PSETEX", rateLimiterKey, timeUnit, 1)
else
rcall("INCR", rateLimiterKey)
end
Expand All @@ -127,8 +124,16 @@ if jobId then
local maxJobs = tonumber(ARGV[6])

if maxJobs then
if rateLimit(jobId, maxJobs) then
return
if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then
-- Check if we can return a delayed job instead
jobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[4]) * 0x1000, "LIMIT", 0, 1)[1]
if jobId then
if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then
return
end
else
return
end
end
end

Expand Down
118 changes: 0 additions & 118 deletions lib/commands/moveToFinished-7.lua

This file was deleted.

Loading