Skip to content

Commit

Permalink
feat(queue): add getDebounceJobId method (#2717)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Sep 11, 2024
1 parent 836edd8 commit a68ead9
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
8 changes: 8 additions & 0 deletions docs/gitbook/guide/jobs/debouncing.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ This mode is particularly useful for jobs that have a long running time or those
Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method.
{% endhint %}

## Get Debounce Job Id

If you need to know which is the job id that started the debounce state. You can call **getDebounceJobId** method.

```typescript
const jobId = await myQueue.getDebounceJobId('customValue');
```

## Remove Debounce Key

If you need to stop debouncing before ttl finishes or before finishing a job. You can call **removeDebounceKey** method.
Expand Down
11 changes: 11 additions & 0 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ export class QueueGetters<
return this.scripts.getRateLimitTtl(maxJobs);
}

/**
* Get jobId that starts debounced state.
*
* @param id - debounce identifier
*/
async getDebounceJobId(id: string): Promise<string | null> {
const client = await this.client;

return client.get(`${this.keys.de}:${id}`);
}

/**
* Job counts by type
*
Expand Down
97 changes: 97 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,103 @@ describe('flows', () => {
}).timeout(8000);
});

describe('when child is debounced when added again with same debounce id', function () {
describe('when ttl is not provided', function () {
it('waits until job is finished before removing debounce key', async function () {
const parentQueueName = `parent-queue-${v4()}`;

const flow = new FlowProducer({ connection, prefix });
const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

const worker = new Worker(
queueName,
async job => {
await delay(100);

const jobIdFromDebounceKey = await queue.getDebounceJobId(
'debounce_id',
);
expect(jobIdFromDebounceKey).to.be.equal(job.id);

await flow.add({
name: 'parent',
data: {},
queueName: parentQueueName,
children: [
{
queueName,
name: 'child0',
data: {},
opts: {
debounce: {
id: 'debounce_id',
},
},
},
],
});

await delay(100);
},
{
autorun: false,
connection,
prefix,
},
);
await worker.waitUntilReady();

const { children } = await flow.add({
name: 'parent',
data: {},
queueName: parentQueueName,
children: [
{
queueName,
name: 'child0',
data: {},
opts: {
debounce: {
id: 'debounce_id',
},
},
},
],
});

let debouncedCounter = 0;

const completing = new Promise<void>(resolve => {
queueEvents.once('completed', ({ jobId }) => {
expect(children![0].job.id).to.be.equal(jobId);
resolve();
});

queueEvents.on('debounced', ({ jobId }) => {
debouncedCounter++;
});
});

worker.run();

await completing;

const jobIdFromDebounceKey = await queue.getDebounceJobId(
'debounce_id',
);
expect(jobIdFromDebounceKey).to.be.null;

expect(debouncedCounter).to.be.equal(1);

await worker.close();
await queueEvents.close();
await flow.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});
});

it('should process children before the parent', async () => {
const name = 'child-job';
const values = [
Expand Down

0 comments on commit a68ead9

Please sign in to comment.