Skip to content

Commit

Permalink
fix(priority): consider paused state when calling getCountsPerPriorit…
Browse files Browse the repository at this point in the history
…y (python) (#2609)
  • Loading branch information
roggervalf authored Jun 15, 2024
1 parent 16fb267 commit 6e99250
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
4 changes: 3 additions & 1 deletion python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-2.lua")),
"getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-4.lua")),
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
Expand Down Expand Up @@ -325,6 +325,8 @@ def getCounts(self, types):

def getCountsPerPriorityArgs(self, priorities):
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized']]

args = priorities
Expand Down
2 changes: 2 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ export class Scripts {
private getCountsPerPriorityArgs(priorities: number[]): (string | number)[] {
const keys: (string | number)[] = [
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.prioritized,
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@
Input:
KEYS[1] wait key
KEYS[2] prioritized key
KEYS[2] paused key
KEYS[3] meta key
KEYS[4] prioritized key
ARGV[1...] priorities
]]
local rcall = redis.call
local results = {}
local waitKey = KEYS[1]
local prioritizedKey = KEYS[2]
local pausedKey = KEYS[2]
local prioritizedKey = KEYS[4]

-- Includes
--- @include "includes/getTargetQueueList"

for i = 1, #ARGV do
local priority = tonumber(ARGV[i])
if priority == 0 then
results[#results+1] = rcall("LLEN", waitKey)
local target = getTargetQueueList(KEYS[3], waitKey, pausedKey)
results[#results+1] = rcall("LLEN", target)
else
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
priority * 0x100000000, (priority + 1) * 0x100000000 - 1)
Expand Down
25 changes: 25 additions & 0 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,31 @@ describe('Jobs getters', function () {
'3': 10,
});
});

describe('when queue is paused', () => {
it('returns job counts per priority', async () => {
await queue.waitUntilReady();

await queue.pause();
const jobs = Array.from(Array(42).keys()).map(index => ({
name: 'test',
data: {},
opts: {
priority: index % 4,
},
}));
await queue.addBulk(jobs);

const counts = await queue.getCountsPerPriority([0, 1, 2, 3]);

expect(counts).to.be.eql({
'0': 11,
'1': 11,
'2': 10,
'3': 10,
});
});
});
});

describe('.getDependencies', () => {
Expand Down

0 comments on commit 6e99250

Please sign in to comment.