Skip to content

Commit

Permalink
feat(repeatable): new repeatables structure (#2617) ref #2612 fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 16, 2024
1 parent 8025f99 commit 8376a9a
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 139 deletions.
41 changes: 41 additions & 0 deletions docs/gitbook/guide/jobs/repeatable.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,47 @@ As you may notice, the repeat strategy setting should be provided in `Queue` and
The repeat strategy function receives an optional `jobName` third parameter.
{% endhint %}

### Custom Repeatable Key

By default, we are generating repeatable keys base on repeat options and job name.

In some cases, it is desired to pass a custom key to be able to differentiate your repeatable jobs even when they have same repeat options:

```typescript
import { Queue } from 'bullmq';

const myQueue = new Queue('Paint', { connection });

// Repeat job every 10 seconds
await myQueue.add(
'bird',
{ color: 'bird' },
{
repeat: {
every: 1000,
},
key: 'colibri',
},
);

// Repeat job every 10 seconds
await myQueue.add(
'bird',
{ color: 'bird' },
{
repeat: {
every: 1000,
},
key: 'eagle',
},
);

```

{% hint style="warning" %}
While adding a new repeatable job with same key but different repeat options, you will override your previous record.
{% endhint %}

### Read more:

* 💡 [Repeat Strategy API Reference](https://api.docs.bullmq.io/types/v5.RepeatStrategy.html)
Expand Down
109 changes: 74 additions & 35 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { parseExpression } from 'cron-parser';
import { createHash } from 'crypto';
import { RepeatBaseOptions, RepeatableJob, RepeatOptions } from '../interfaces';
import {
RedisClient,
RepeatBaseOptions,
RepeatableJob,
RepeatOptions,
} from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
Expand Down Expand Up @@ -31,7 +36,7 @@ export class Repeat extends QueueBase {
skipCheckExists?: boolean,
): Promise<Job<T, R, N> | undefined> {
// HACK: This is a temporary fix to enable easy migration from bullmq <3.0.0
// to >= 3.0.0. It should be removed when moving to 4.x.
// to >= 3.0.0. TODO: It should be removed when moving to 4.x.
const repeatOpts: RepeatOptions & { cron?: string } = { ...opts.repeat };
repeatOpts.pattern ??= repeatOpts.cron;
delete repeatOpts.cron;
Expand Down Expand Up @@ -70,23 +75,27 @@ export class Repeat extends QueueBase {
repeatOpts.jobId = opts.jobId;
}

const repeatJobKey = getRepeatKey(name, repeatOpts);
const qualifiedName = getRepeatCocatOptions(name, repeatOpts);

let repeatableExists = true;
const repeatJobKey = await this.scripts.addRepeatableJob(
opts.repeat.key ?? this.hash(qualifiedName),
nextMillis,
{
name,
endDate: repeatOpts.endDate
? new Date(repeatOpts.endDate).getTime()
: undefined,
tz: repeatOpts.tz,
pattern: repeatOpts.pattern,
every: repeatOpts.every,
},
qualifiedName,
skipCheckExists,
);

if (!skipCheckExists) {
// Check that the repeatable job hasn't been removed
// TODO: a lua script would be better here
const client = await this.client;
repeatableExists = !!(await client.zscore(
this.keys.repeat,
repeatJobKey,
));
}
const { immediately, ...filteredRepeatOpts } = repeatOpts;

// The job could have been deleted since this check
if (repeatableExists) {
if (repeatJobKey) {
return this.createNextJob<T, R, N>(
name,
nextMillis,
Expand All @@ -109,17 +118,12 @@ export class Repeat extends QueueBase {
currentCount: number,
hasImmediately: boolean,
) {
const client = await this.client;

//
// Generate unique job id for this iteration.
//
const jobId = this.getRepeatJobId({
name,
const jobId = this.getRepeatDelayedJobId({
customKey: repeatJobKey,
nextMillis,
namespace: this.hash(repeatJobKey),
jobId: opts.repeat.jobId,
key: opts.repeat.key,
});
const now = Date.now();
const delay =
Expand All @@ -136,8 +140,6 @@ export class Repeat extends QueueBase {

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

await client.zadd(this.keys.repeat, nextMillis.toString(), repeatJobKey);

return this.Job.create<T, R, N>(this, name, data, mergedOpts);
}

Expand All @@ -146,29 +148,56 @@ export class Repeat extends QueueBase {
repeat: RepeatOptions,
jobId?: string,
): Promise<number> {
const repeatJobKey = getRepeatKey(name, { ...repeat, jobId });
const repeatJobId = this.getRepeatJobId({
const qualifiedName = getRepeatCocatOptions(name, { ...repeat, jobId });
const repeatJobKey = repeat.key ?? this.hash(qualifiedName);
const legacyRepeatJobId = this.getRepeatJobId({
name,
nextMillis: '',
namespace: this.hash(repeatJobKey),
namespace: this.hash(qualifiedName),
jobId: jobId ?? repeat.jobId,
key: repeat.key,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
return this.scripts.removeRepeatable(
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
);
}

async removeRepeatableByKey(repeatJobKey: string): Promise<number> {
const data = this.keyToData(repeatJobKey);

const repeatJobId = this.getRepeatJobId({
const legacyRepeatJobId = this.getRepeatJobId({
name: data.name,
nextMillis: '',
namespace: this.hash(repeatJobKey),
jobId: data.id,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
return this.scripts.removeRepeatable(legacyRepeatJobId, '', repeatJobKey);
}

private async getRepeatableData(
client: RedisClient,
key: string,
next?: number,
): Promise<RepeatableJob> {
const jobData = await client.hgetall(this.toKey('repeat:' + key));

if (jobData) {
return {
key,
name: jobData.name,
endDate: parseInt(jobData.endDate) || null,
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
next,
};
}

return this.keyToData(key, next);
}

private keyToData(key: string, next?: number): RepeatableJob {
Expand Down Expand Up @@ -200,9 +229,11 @@ export class Repeat extends QueueBase {

const jobs = [];
for (let i = 0; i < result.length; i += 2) {
jobs.push(this.keyToData(result[i], parseInt(result[i + 1])));
jobs.push(
this.getRepeatableData(client, result[i], parseInt(result[i + 1])),
);
}
return jobs;
return Promise.all(jobs);
}

async getRepeatableCount(): Promise<number> {
Expand All @@ -214,6 +245,16 @@ export class Repeat extends QueueBase {
return createHash(this.repeatKeyHashAlgorithm).update(str).digest('hex');
}

private getRepeatDelayedJobId({
nextMillis,
customKey,
}: {
customKey: string;
nextMillis: number | string;
}) {
return `repeat:${customKey}:${nextMillis}`;
}

private getRepeatJobId({
name,
nextMillis,
Expand All @@ -229,12 +270,10 @@ export class Repeat extends QueueBase {
}) {
const checksum = key ?? this.hash(`${name}${jobId || ''}${namespace}`);
return `repeat:${checksum}:${nextMillis}`;
// return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`;
//return `repeat:${name}:${namespace}:${nextMillis}`;
}
}

function getRepeatKey(name: string, repeat: RepeatOptions) {
function getRepeatCocatOptions(name: string, repeat: RepeatOptions) {
const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
const tz = repeat.tz || '';
const pattern = repeat.pattern;
Expand Down
62 changes: 58 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
WorkerOptions,
KeepJobs,
MoveToDelayedOpts,
RepeatableOptions,
} from '../interfaces';
import {
JobState,
Expand Down Expand Up @@ -259,25 +260,78 @@ export class Scripts {
return (<any>client).pause(args);
}

protected addRepeatableJobArgs(
customKey: string,
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [
this.queue.keys.repeat,
customKey,
];

const args = [
nextMillis,
pack(opts),
legacyCustomKey,
skipCheckExists ? '1' : '0',
];

return keys.concat(args);
}

async addRepeatableJob(
customKey: string,
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): Promise<string> {
const client = await this.queue.client;

const args = this.addRepeatableJobArgs(
customKey,
nextMillis,
opts,
legacyCustomKey,
skipCheckExists,
);

return (<any>client).addRepeatableJob(args);
}

private removeRepeatableArgs(
repeatJobId: string,
legacyRepeatJobId: string,
qualifiedName: string,
repeatJobKey: string,
): string[] {
const queueKeys = this.queue.keys;

const keys = [queueKeys.repeat, queueKeys.delayed];

const args = [repeatJobId, repeatJobKey, queueKeys['']];
const args = [
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
queueKeys[''],
];

return keys.concat(args);
}

async removeRepeatable(
repeatJobId: string,
legacyRepeatJobId: string,
qualifiedName: string,
repeatJobKey: string,
): Promise<number> {
const client = await this.queue.client;
const args = this.removeRepeatableArgs(repeatJobId, repeatJobKey);
const args = this.removeRepeatableArgs(
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
);

return (<any>client).removeRepeatable(args);
}
Expand Down
Loading

0 comments on commit 8376a9a

Please sign in to comment.