Skip to content

Commit

Permalink
fix(repeatable): keep legacy repeatables if it exists instead of crea…
Browse files Browse the repository at this point in the history
…ting one with new structure (#2665)
  • Loading branch information
roggervalf authored Jul 19, 2024
1 parent a5ac1ae commit 93fad41
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 8 deletions.
28 changes: 24 additions & 4 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ export class Repeat extends QueueBase {
//
// Generate unique job id for this iteration.
//
const jobId = this.getRepeatDelayedJobId({
customKey: repeatJobKey,
nextMillis,
});
const jobId = this.getRepeatJobKey(name, nextMillis, repeatJobKey, data);

const now = Date.now();
const delay =
nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now;
Expand All @@ -143,6 +141,28 @@ export class Repeat extends QueueBase {
return this.Job.create<T, R, N>(this, name, data, mergedOpts);
}

// TODO: remove legacy code in next breaking change
getRepeatJobKey<T = any, N extends string = string>(
name: N,
nextMillis: number,
repeatJobKey: string,
data: T,
) {
if (repeatJobKey.split(':').length > 2) {
return this.getRepeatJobId({
name: name,
nextMillis: nextMillis,
namespace: this.hash(repeatJobKey),
jobId: (data as any)?.id,
});
}

return this.getRepeatDelayedJobId({
customKey: repeatJobKey,
nextMillis,
});
}

async removeRepeatable(
name: string,
repeat: RepeatOptions,
Expand Down
6 changes: 4 additions & 2 deletions src/commands/addRepeatableJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ local function storeRepeatableJob(repeatKey, customKey, nextMilli, rawOpts)
return customKey
end

if ARGV[5] == '0' then
if rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then
local legacyRepeatableJobExists = rcall("ZSCORE", repeatKey, legacyCustomKey)

if ARGV[5] == '0' or legacyRepeatableJobExists ~= false then
if legacyRepeatableJobExists ~= false then
rcall("ZADD", repeatKey, nextMilli, legacyCustomKey)
return legacyCustomKey
elseif rcall("ZSCORE", repeatKey, customKey) ~= false then
Expand Down
2 changes: 0 additions & 2 deletions tests/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,9 @@ describe('Delayed jobs', function () {
await worker.waitUntilReady();

const timestamp = Date.now();
const publishHappened = false;

const waiting = new Promise<void>(resolve => {
queueEvents.on('waiting', () => {
console.log(Date.now() - timestamp);
const currentDelay = Date.now() - timestamp;
expect(currentDelay).to.be.greaterThanOrEqual(delayTime);
expect(currentDelay).to.be.lessThanOrEqual(delayTime * margin);
Expand Down
64 changes: 64 additions & 0 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,70 @@ describe('repeat', function () {
const repeatableJobsAfterRemove = await queue.getRepeatableJobs();
expect(repeatableJobsAfterRemove).to.have.length(0);
});

describe('when re-adding repeatable job now with new format', function () {
it('should keep legacy repeatable job and be able to remove it', async function () {
this.clock.setSystemTime(1721187138606);
const client = await queue.client;
await client.hmset(
`${prefix}:${queue.name}:repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000`,
'priority',
0,
'delay',
14524061394,
'data',
'{}',
'timestamp',
1721187138606,
'rjk',
'remove::::* 1 * 1 *',
'name',
'remove',
);
await client.zadd(
`${prefix}:${queue.name}:repeat`,
1735711200000,
'remove::::* 1 * 1 *',
);
await client.zadd(
`${prefix}:${queue.name}:delayed`,
1735711200000,
'repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000',
);

const repeat = { pattern: '* 1 * 1 *' };

const repeatableJobs = await queue.getRepeatableJobs();
expect(repeatableJobs).to.have.length(1);
expect(repeatableJobs[0].key).to.be.equal('remove::::* 1 * 1 *');
const removed = await queue.removeRepeatable('remove', repeat);

const delayedCount = await queue.getJobCountByTypes('delayed');
expect(delayedCount).to.be.equal(0);
expect(removed).to.be.true;
const repeatableJobsAfterRemove = await queue.getRepeatableJobs();
expect(repeatableJobsAfterRemove).to.have.length(0);
});

it('should keep legacy repeatable job and delayed referece', async function () {
this.clock.setSystemTime(1721187138606);

const client = await queue.client;
await client.zadd(
`${prefix}:${queue.name}:repeat`,
1735693200000,
'remove::::* 1 * 1 *',
);

await queue.add('remove', {}, { repeat: { pattern: '* 1 * 1 *' } });
const repeatableJobs = await queue.getRepeatableJobs();
expect(repeatableJobs).to.have.length(1);
expect(repeatableJobs[0].key).to.be.equal('remove::::* 1 * 1 *');

const delayedCount = await queue.getJobCountByTypes('delayed');
expect(delayedCount).to.be.equal(1);
});
});
});

describe('when repeatable job does not exist', function () {
Expand Down

0 comments on commit 93fad41

Please sign in to comment.