Skip to content

Commit

Permalink
fix(queue-getters): consider passing maxJobs when calling getRateLimi…
Browse files Browse the repository at this point in the history
…tTtl (#2631) fixes #2628
  • Loading branch information
roggervalf committed Jul 5, 2024
1 parent b267fb5 commit 9f6609a
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ export class QueueGetters<

/**
* Returns the time to live for a rate limited key in milliseconds.
* @param maxJobs - max jobs to be considered in rate limit state. If not passed
* it will return the remaining ttl without considering if max jobs is excedeed.
* @returns -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
* @see {@link https://redis.io/commands/pttl/}
*/
async getRateLimitTtl(): Promise<number> {
const client = await this.client;
return client.pttl(this.keys.limiter);
async getRateLimitTtl(maxJobs?: number): Promise<number> {
return this.scripts.getRateLimitTtl(maxJobs);
}

/**
Expand Down Expand Up @@ -148,6 +149,7 @@ export class QueueGetters<
/**
* Get current job state.
*
* @param jobId - job identifier.
* @returns Returns one of these values:
* 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
*/
Expand Down
13 changes: 13 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,19 @@ export class Scripts {
}
}

getRateLimitTtlArgs(maxJobs?: number): (string | number)[] {
const keys: (string | number)[] = [this.queue.keys.limiter];

return keys.concat([maxJobs ?? '0']);
}

async getRateLimitTtl(maxJobs?: number): Promise<number> {
const client = await this.queue.client;

const args = this.getRateLimitTtlArgs(maxJobs);
return (<any>client).getRateLimitTtl(args);
}

/**
* Remove jobs in a specific state.
*
Expand Down
20 changes: 20 additions & 0 deletions src/commands/getRateLimitTtl-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--[[
Get rate limit ttl
Input:
KEYS[1] 'limiter'
ARGV[1] maxJobs
]]

local rcall = redis.call

-- Includes
--- @include "includes/getRateLimitTTL"

local rateLimiterKey = KEYS[1]
if ARGV[1] ~= "0" then
return getRateLimitTTL(tonumber(ARGV[1]), rateLimiterKey)
else
return rcall("PTTL", rateLimiterKey)
end
122 changes: 122 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,128 @@ describe('Rate Limiter', function () {
});
});

describe('when passing maxJobs when getting rate limit ttl', () => {
describe('when rate limit counter is lower than maxJobs', () => {
it('should returns 0', async function () {
this.timeout(4000);

const numJobs = 1;
const duration = 100;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
delay(50);
const currentTtl = await queue.getRateLimitTtl(2);
expect(currentTtl).to.be.equal(0);
}
},
{
connection,
prefix,
maxStalledCount: 0,
limiter: {
max: 2,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});
});

describe('when rate limit counter is greater than maxJobs', () => {
it('should returns at least rate limit duration', async function () {
this.timeout(4000);

const numJobs = 10;
const duration = 100;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
delay(50);
const currentTtl = await queue.getRateLimitTtl(1);
expect(currentTtl).to.be.lessThanOrEqual(duration);
}
},
{
connection,
prefix,
maxStalledCount: 0,
limiter: {
max: 1,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});
});
});

describe('when reaching max attempts and we want to move the job to failed', () => {
it('should throw Unrecoverable error', async function () {
const dynamicLimit = 550;
Expand Down

0 comments on commit 9f6609a

Please sign in to comment.