Skip to content

Commit

Permalink
fix(retry-jobs): add marker when needed (#2374)
Browse files Browse the repository at this point in the history
* fix(retry-jobs): add marker when needed

* refactor: update scripts in python
  • Loading branch information
roggervalf authored Jan 16, 2024
1 parent 368b5a1 commit 1813d5f
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 9 deletions.
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-6.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")),
Expand Down Expand Up @@ -419,7 +419,7 @@ async def obliterate(self, count: int, force: bool = False):

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta'])
['', 'events', state, 'wait', 'paused', 'meta', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)
Expand Down
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,8 @@ export class Scripts {
this.queue.toKey(state),
this.queue.toKey('wait'),
this.queue.toKey('paused'),
this.queue.toKey('meta'),
this.queue.keys.meta,
this.queue.keys.marker,
];

const args = [count, timestamp, state];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
KEYS[4] 'wait'
KEYS[5] 'paused'
KEYS[6] 'meta'
KEYS[7] 'marker'
ARGV[1] count
ARGV[2] timestamp
Expand All @@ -30,7 +31,7 @@ local rcall = redis.call;
--- @include "includes/getTargetQueueList"

local metaKey = KEYS[6]
local target = getTargetQueueList(metaKey, KEYS[4], KEYS[5])
local target, paused = getTargetQueueList(metaKey, KEYS[4], KEYS[5])

local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount)
if (#jobs > 0) then
Expand Down Expand Up @@ -59,6 +60,10 @@ if (#jobs > 0) then
rcall("ZREM", KEYS[3], unpack(jobs, from, to))
rcall("LPUSH", target, unpack(jobs, from, to))
end

if not paused then
rcall("ZADD", KEYS[7], 0, "0")
end
end

maxCount = maxCount - #jobs
Expand Down
2 changes: 1 addition & 1 deletion tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ describe('events', function () {
{ name: 'test', data: { foo: 'baz' } },
]);

await delay(1000);
await delay(2000);

const jobs = await queue.getJobCountByTypes('completed');
expect(jobs).to.be.equal(4);
Expand Down
10 changes: 6 additions & 4 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ describe('queues', function () {
});

describe('when completed state is provided', () => {
it('retries all completed jobs', async () => {
it('retries all completed jobs', async function () {
await queue.waitUntilReady();
const jobCount = 8;

Expand All @@ -485,9 +485,11 @@ describe('queues', function () {
worker.on('completed', after(jobCount, resolve));
});

for (const index of Array.from(Array(jobCount).keys())) {
await queue.add('test', { idx: index });
}
const jobs = Array.from(Array(jobCount).keys()).map(index => ({
name: 'test',
data: { idx: index },
}));
await queue.addBulk(jobs);

await completing1;

Expand Down

0 comments on commit 1813d5f

Please sign in to comment.