diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 4b40395c8f..ba3690e89a 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -190,6 +190,23 @@ export class QueueGetters< return this.getJobCountByTypes('prioritized'); } + /** + * Returns the number of jobs per priority. + */ + async getCountsPerPriority(priorities: number[]): Promise<{ + [index: string]: number; + }> { + const uniquePriorities = [...new Set(priorities)]; + const responses = await this.scripts.getCountsPerPriority(uniquePriorities); + + const counts: { [index: string]: number } = {}; + responses.forEach((res, index) => { + counts[`${uniquePriorities[index]}`] = res || 0; + }); + + return counts; + } + /** * Returns the number of jobs in waiting or paused statuses. */ diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index f8a029d744..5a86b30bf6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -614,6 +614,24 @@ export class Scripts { return (client).getCounts(args); } + private getCountsPerPriorityArgs(priorities: number[]): (string | number)[] { + const keys: (string | number)[] = [ + this.queue.keys.wait, + this.queue.keys.prioritized, + ]; + + const args = priorities; + + return keys.concat(args); + } + + async getCountsPerPriority(priorities: number[]): Promise { + const client = await this.queue.client; + const args = this.getCountsPerPriorityArgs(priorities); + + return (client).getCountsPerPriority(args); + } + moveToCompletedArgs( job: MinimalJob, returnvalue: R, diff --git a/src/commands/getCountsPerPriority-2.lua b/src/commands/getCountsPerPriority-2.lua new file mode 100644 index 0000000000..a6e4347593 --- /dev/null +++ b/src/commands/getCountsPerPriority-2.lua @@ -0,0 +1,25 @@ +--[[ + Get counts per provided states + + Input: + KEYS[1] wait key + KEYS[2] prioritized key + + ARGV[1...] priorities +]] +local rcall = redis.call +local results = {} +local waitKey = KEYS[1] +local prioritizedKey = KEYS[2] + +for i = 1, #ARGV do + local priority = tonumber(ARGV[i]) + if priority == 0 then + results[#results+1] = rcall("LLEN", waitKey) + else + results[#results+1] = rcall("ZCOUNT", prioritizedKey, + priority * 0x100000000, (priority + 1) * 0x100000000 - 1) + end +end + +return results diff --git a/tests/test_getters.ts b/tests/test_getters.ts index 4c22cfab96..ecc853c6f1 100644 --- a/tests/test_getters.ts +++ b/tests/test_getters.ts @@ -844,6 +844,30 @@ describe('Jobs getters', function () { }); }); + describe('.getCountsPerPriority', () => { + it('returns job counts per priority', async () => { + await queue.waitUntilReady(); + + const jobs = Array.from(Array(42).keys()).map(index => ({ + name: 'test', + data: {}, + opts: { + priority: index % 4, + }, + })); + await queue.addBulk(jobs); + + const counts = await queue.getCountsPerPriority([0, 1, 2, 3]); + + expect(counts).to.be.eql({ + '0': 11, + '1': 11, + '2': 10, + '3': 10, + }); + }); + }); + describe('.getDependencies', () => { it('return unprocessed jobs that are dependencies of a given parent job', async () => { const flowProducer = new FlowProducer({ connection, prefix });