Skip to content

Commit

Permalink
fix(job-queue-plugin): Correct behaviour of job list query with BullMQ
Browse files Browse the repository at this point in the history
Fixes #2120, fixes #1327. In order to get the correct behaviour I needed to create a custom
Lua script which is sent to Redis and implements the desired filtering and pagination
functionality.
  • Loading branch information
michaelbromley committed Sep 25, 2023
1 parent 4cf1826 commit c148a92
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 18 deletions.
80 changes: 62 additions & 18 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@ import {
Logger,
PaginatedList,
} from '@vendure/core';
import Bull, { ConnectionOptions, JobType, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import Bull, {
ConnectionOptions,
JobType,
Processor,
Queue,
Worker,
WorkerOptions,
Job as BullJob,
} from 'bullmq';
import { EventEmitter } from 'events';
import { Cluster, Redis, RedisOptions } from 'ioredis';

import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
import { RedisHealthIndicator } from './redis-health-indicator';
import { BullMQPluginOptions } from './types';
import { getJobsByType } from './scripts/get-jobs-by-type';
import { BullMQPluginOptions, CustomScriptDefinition } from './types';

const QUEUE_NAME = 'vendure-job-queue';
const DEFAULT_CONCURRENCY = 3;
Expand Down Expand Up @@ -53,6 +62,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
? this.connectionOptions
: new Redis(this.connectionOptions);

this.defineCustomLuaScripts();

const redisHealthIndicator = injector.get(RedisHealthIndicator);
Logger.info('Checking Redis connection...', loggerCtx);
const health = await redisHealthIndicator.isHealthy('redis');
Expand Down Expand Up @@ -137,8 +148,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}

async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
const start = options?.skip ?? 0;
const end = start + (options?.take ?? 10);
const skip = options?.skip ?? 0;
const take = options?.take ?? 10;
let jobTypes: JobType[] = ALL_JOB_TYPES;
const stateFilter = options?.filter?.state;
if (stateFilter?.eq) {
Expand Down Expand Up @@ -170,26 +181,31 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
? ['completed', 'failed']
: ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused'];
}

let items: Bull.Job[] = [];
let jobCounts: { [index: string]: number } = {};
try {
items = await this.queue.getJobs(jobTypes, start, end);
} catch (e: any) {
Logger.error(e.message, loggerCtx, e.stack);
}
let totalItems = 0;

try {
jobCounts = await this.queue.getJobCounts(...jobTypes);
const [total, jobIds] = await this.callCustomScript(getJobsByType, [
skip,
take,
options?.filter?.queueName?.eq ?? '',
...jobTypes,
]);
items = (
await Promise.all(
jobIds.map(id => {
return BullJob.fromId(this.queue, id);
}),
)
).filter(notNullOrUndefined);
totalItems = total;
} catch (e: any) {
Logger.error(e.message, loggerCtx, e.stack);
throw new InternalServerError(e.message);
}
const totalItems = Object.values(jobCounts).reduce((sum, count) => sum + count, 0);

return {
items: await Promise.all(
items
.sort((a, b) => b.timestamp - a.timestamp)
.map(bullJob => this.createVendureJob(bullJob)),
),
items: await Promise.all(items.map(bullJob => this.createVendureJob(bullJob))),
totalItems,
};
}
Expand Down Expand Up @@ -252,6 +268,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}

private stopped = false;

async stop<Data extends JobData<Data> = object>(
queueName: string,
process: (job: Job<Data>) => Promise<any>,
Expand Down Expand Up @@ -308,4 +325,31 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
throw new InternalServerError('Could not determine job state');
// TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
}

private callCustomScript<T, Args extends any[]>(
scriptDef: CustomScriptDefinition<T, Args>,
args: Args,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
(this.redisConnection as any)[scriptDef.name](
`bull:${this.queue.name}:`,
...args,
(err: any, result: any) => {
if (err) {
reject(err);
} else {
resolve(result);
}
},
);
});
}

private defineCustomLuaScripts() {
const redis = this.redisConnection;
redis.defineCommand(getJobsByType.name, {
numberOfKeys: getJobsByType.numberOfKeys,
lua: getJobsByType.script,
});
}
}
117 changes: 117 additions & 0 deletions packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// language=Lua
import { CustomScriptDefinition } from '../types';

const script = `--[[
Get job ids per provided states and filter by name
Input:
KEYS[1] 'prefix'
ARGV[1] start
ARGV[2] end
ARGV[3] filterName
ARGV[4...] types
]]
local rcall = redis.call
local prefix = KEYS[1]
local rangeStart = tonumber(ARGV[1])
local rangeEnd = tonumber(ARGV[2])
local filterName = ARGV[3]
local results = {}
local targetSets = {}
-- Initialize an empty array to hold the sets to unionize. The "completed" and "failed" lists
-- are sorted sets
local setsToUnionize = {}
local typesInUnion = {}
-- Initialize an empty array to hold lists to include. The "active" and "wait" lists are
-- regular lists
local listsToInclude = {}
-- Iterate through ARGV starting from the first element (ARGV[1]) up to the end
for i = 4, #ARGV do
local setKey = prefix .. ARGV[i]
-- Check if the setKey is valid (e.g., it exists and is a sorted set)
local targetExists = redis.call('EXISTS', setKey)
local listType = redis.call('TYPE', setKey).ok
if targetExists == 1 and listType == 'zset' then
-- Add the valid set to the array
table.insert(setsToUnionize, setKey)
table.insert(typesInUnion, ARGV[i])
end
if targetExists == 1 and listType == 'list' then
-- Add the valid set to the array
table.insert(listsToInclude, setKey)
table.insert(typesInUnion, ARGV[i])
end
end
-- Define the destination key for the concatenated sorted set
local tempSortedSetUnionKey = prefix .. 'union:' .. table.concat(typesInUnion, ':');
if #listsToInclude == 0 and #setsToUnionize == 0 then
return {0, {}}
end
-- Check if there are valid sets to unionize
if #setsToUnionize > 0 then
-- Use ZUNIONSTORE to concatenate the valid sorted sets into the destination key
local numSets = #setsToUnionize
redis.call('ZUNIONSTORE', tempSortedSetUnionKey, numSets, unpack(setsToUnionize))
end
local originalResults = rcall("ZREVRANGE", tempSortedSetUnionKey, 0, -1)
if #listsToInclude > 0 then
for _, listKey in ipairs(listsToInclude) do
local list = rcall("LRANGE", listKey, 0, -1)
for _, jobId in ipairs(list) do
table.insert(originalResults, jobId)
end
end
end
-- Define a custom comparison function for sorting in descending order
local function compareDescending(a, b)
return tonumber(a) > tonumber(b)
end
-- Sort the table in descending order
table.sort(originalResults, compareDescending)
local filteredResults = {}
local totalResults = 0
for _, job in ipairs(originalResults) do
local jobName = rcall("HGET", prefix .. job, "name");
if filterName ~= "" and jobName == filterName then
if rangeStart <= totalResults and #filteredResults < rangeEnd then
table.insert(filteredResults, job)
end
totalResults = totalResults + 1
elseif filterName == "" then
if rangeStart <= totalResults and #filteredResults < rangeEnd then
table.insert(filteredResults, job)
end
totalResults = totalResults + 1
end
end
rcall("DEL", tempSortedSetUnionKey)
return {totalResults, filteredResults}
`;

export const getJobsByType: CustomScriptDefinition<
[totalItems: number, jobIds: string[]],
[rangeStart: number, rangeEnd: number, queueName: string | undefined, ...states: string[]]
> = {
script,
numberOfKeys: 1,
name: 'getJobsByType',
};
10 changes: 10 additions & 0 deletions packages/job-queue-plugin/src/bullmq/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ export interface BackoffOptions {
type: 'exponential' | 'fixed';
delay: number;
}

/**
* @description
* A definition for a Lua script used to define custom behavior in Redis
*/
export interface CustomScriptDefinition<T, Args extends any[]> {
name: string;
script: string;
numberOfKeys: number;
}

0 comments on commit c148a92

Please sign in to comment.