Skip to content

Commit

Permalink
perf(delayed): keep moving delayed jobs to waiting when queue is paus…
Browse files Browse the repository at this point in the history
…ed (#2640) (python)
  • Loading branch information
roggervalf committed Jul 11, 2024
1 parent 8bb08a7 commit b89e2e0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 20 deletions.
8 changes: 2 additions & 6 deletions src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ local parentData
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/isQueuePaused"
--- @include "includes/storeJob"

if parentKey ~= nil then
Expand Down Expand Up @@ -99,11 +98,8 @@ rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local isPaused = isQueuePaused(metaKey)
if not isPaused then
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
end
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)

-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
Expand Down
6 changes: 1 addition & 5 deletions src/commands/changeDelay-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ local rcall = redis.call
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"

if rcall("EXISTS", ARGV[4]) == 1 then
local jobId = ARGV[3]
Expand All @@ -48,10 +47,7 @@ if rcall("EXISTS", ARGV[4]) == 1 then
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local isPaused = isQueuePaused(KEYS[2])
if not isPaused then
addDelayMarkerIfNeeded(KEYS[3], KEYS[1])
end
addDelayMarkerIfNeeded(KEYS[3], KEYS[1])

return 0
else
Expand Down
8 changes: 2 additions & 6 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ local rcall = redis.call
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"
--- @include "includes/removeLock"

local jobKey = KEYS[5]
Expand Down Expand Up @@ -65,11 +64,8 @@ if rcall("EXISTS", jobKey) == 1 then
"jobId", jobId, "delay", delayedTimestamp)

-- Check if we need to push a marker job to wake up sleeping workers.
local isPaused = isQueuePaused(metaKey)
if not isPaused then
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
end
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)

return 0
else
Expand Down
37 changes: 37 additions & 0 deletions tests/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,43 @@ describe('Delayed jobs', function () {
});
});

describe('when queue is paused', function () {
it('should keep moving delayed jobs to waiting', async function () {
const delayTime = 2500;
const margin = 1.2;

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

await queue.pause();
const worker = new Worker(queueName, async () => {}, {
connection,
prefix,
});
await worker.waitUntilReady();

const timestamp = Date.now();
const publishHappened = false;

const waiting = new Promise<void>(resolve => {
queueEvents.on('waiting', () => {
console.log(Date.now() - timestamp);
const currentDelay = Date.now() - timestamp;
expect(currentDelay).to.be.greaterThanOrEqual(delayTime);
expect(currentDelay).to.be.lessThanOrEqual(delayTime * margin);
resolve();
});
});

await queue.add('test', { delayed: 'foobar' }, { delay: delayTime });

await waiting;

await queueEvents.close();
await worker.close();
});
});

it('should process a delayed job added after an initial long delayed job', async function () {
const oneYearDelay = 1000 * 60 * 60 * 24 * 365; // One year.
const delayTime = 1000;
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ describe('Pause', function () {
}
const counts2 = await queue.getJobCounts('waiting', 'paused', 'delayed');
expect(counts2).to.have.property('waiting', 0);
expect(counts2).to.have.property('paused', 0);
expect(counts2).to.have.property('delayed', 1);
expect(counts2).to.have.property('paused', 1);
expect(counts2).to.have.property('delayed', 0);

await worker.close();
});
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ describe('workers', function () {
const processing = new Promise<void>((resolve, reject) => {
processor = async (job: Job) => {
try {
await delay(10);
await delay(20);
expect(job.data.num).to.be.equal(counter);
expect(job.data.foo).to.be.equal('bar');
if (counter === maxJobs) {
Expand Down

0 comments on commit b89e2e0

Please sign in to comment.