Skip to content

Commit

Permalink
perf(job): set processedBy using hmset (#2592) (python)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 4, 2024
1 parent c19c839 commit 238680b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
12 changes: 6 additions & 6 deletions src/commands/includes/addDelayMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
--- @include "getNextDelayedTimestamp"

local function addDelayMarkerIfNeeded(markerKey, delayedKey)
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "1")
end
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "1")
end
end
7 changes: 5 additions & 2 deletions src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end

local optionalValues = {}

if opts['name'] then
-- Set "processedBy" field to the worker name
rcall("HSET", jobKey, "pb", opts['name'])
table.insert(optionalValues, "pb")
table.insert(optionalValues, opts['name'])
end

rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
rcall("HINCRBY", jobKey, "ats", 1)

return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
Expand Down

0 comments on commit 238680b

Please sign in to comment.