Skip to content

Commit

Permalink
feat(worker): improved markers handling
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
Markers use now a dedicated key in redis instead of using a special Job ID.
  • Loading branch information
manast authored and roggervalf committed Dec 4, 2023
1 parent b4c3001 commit 73cf5fc
Show file tree
Hide file tree
Showing 33 changed files with 574 additions and 494 deletions.
6 changes: 4 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ export class Job<
);

const result = await this.scripts.moveToFinished(this.id, args);
this.finishedOn = args[14] as number;
this.finishedOn = args[
this.scripts.moveToFinishedKeys.length + 1
] as number;

return result;
}
Expand Down Expand Up @@ -667,7 +669,7 @@ export class Job<
fetchNext,
);
(<any>multi).moveToFinished(args);
finishedOn = args[14];
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1] as number;
command = 'failed';
}

Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export class QueueKeys {
'limiter',
'meta',
'events',
'pc',
'pc', // priority counter key
'marker', // marker key
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
61 changes: 30 additions & 31 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class Scripts {
undefined,
undefined,
undefined,
undefined,
];
}

Expand All @@ -79,8 +80,7 @@ export class Scripts {
): Promise<string> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
Expand All @@ -101,8 +101,7 @@ export class Scripts {
): Promise<string> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.prioritized,
Expand Down Expand Up @@ -197,6 +196,7 @@ export class Scripts {
queueKeys.id,
queueKeys.completed,
queueKeys.events,
queueKeys.marker,
];
keys.push(pack(args), job.data, encodedOpts);
result = await (<any>client).addStandardJob(keys);
Expand All @@ -223,7 +223,11 @@ export class Scripts {
this.queue.toKey(name),
);

keys.push(this.queue.keys.events);
keys.push(
this.queue.keys.events,
this.queue.keys.delayed,
this.queue.keys.marker,
);

return (<any>client).pause(keys.concat([pause ? 'paused' : 'resumed']));
}
Expand Down Expand Up @@ -336,6 +340,7 @@ export class Scripts {
keys[10] = queueKeys[target];
keys[11] = this.queue.toKey(job.id ?? '');
keys[12] = metricsKey;
keys[13] = this.queue.keys.marker;

const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

Expand Down Expand Up @@ -631,6 +636,7 @@ export class Scripts {
this.queue.keys.meta,
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
];

return keys.concat([
Expand Down Expand Up @@ -661,20 +667,16 @@ export class Scripts {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
}

const queueKeys = this.queue.keys;
const keys: (string | number)[] = [
'wait',
'active',
'prioritized',
'delayed',
jobId,
].map(name => {
return this.queue.toKey(name);
});
keys.push.apply(keys, [
this.queue.keys.events,
this.queue.keys.paused,
this.queue.keys.meta,
]);
queueKeys.marker,
queueKeys.active,
queueKeys.prioritized,
queueKeys.delayed,
this.queue.toKey(jobId),
queueKeys.events,
queueKeys.meta,
];

return keys.concat([
this.queue.keys[''],
Expand Down Expand Up @@ -798,21 +800,17 @@ export class Scripts {
token: string,
): (string | number)[] {
const keys: (string | number)[] = [
'active',
'wait',
'paused',
jobId,
'meta',
].map(name => {
return this.queue.toKey(name);
});

keys.push(
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.toKey(jobId),
this.queue.keys.meta,
this.queue.keys.events,
this.queue.keys.delayed,
this.queue.keys.prioritized,
this.queue.keys.pc,
);
this.queue.keys.marker,
];

const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';

Expand Down Expand Up @@ -909,7 +907,7 @@ export class Scripts {
}
}

async moveToActive(client: RedisClient, token: string, jobId?: string) {
async moveToActive(client: RedisClient, token: string) {
const opts = this.queue.opts as WorkerOptions;

const queueKeys = this.queue.keys;
Expand All @@ -924,12 +922,12 @@ export class Scripts {
queueKeys.paused,
queueKeys.meta,
queueKeys.pc,
queueKeys.marker,
];

const args: (string | number | boolean | Buffer)[] = [
queueKeys[''],
Date.now(),
jobId || '',
pack({
token,
lockDuration: opts.lockDuration,
Expand All @@ -955,6 +953,7 @@ export class Scripts {
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.events,
this.queue.keys.marker,
];

const args = [this.queue.toKey(''), jobId];
Expand Down
67 changes: 29 additions & 38 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export class Worker<
private limitUntil = 0;
private resumeWorker: () => void;
private stalledCheckTimer: NodeJS.Timeout;
private waiting: Promise<string> | null = null;
private waiting: Promise<number> | null = null;
private _repeat: Repeat;

protected paused: Promise<void>;
Expand Down Expand Up @@ -426,7 +426,7 @@ export class Worker<
numTotal = asyncFifoQueue.numTotal();

if (this.waiting && numTotal > 1) {
// We have a job waiting but we have others that we could start processing already
// We are waiting for jobs but we have others that we could start processing already
break;
}

Expand All @@ -451,10 +451,7 @@ export class Worker<
let job: Job<DataType, ResultType, NameType> | void;
do {
job = await asyncFifoQueue.fetch();
} while (
!job &&
asyncFifoQueue.numQueued() > 0
);
} while (!job && asyncFifoQueue.numQueued() > 0);

if (job) {
const token = job.token;
Expand Down Expand Up @@ -514,10 +511,13 @@ export class Worker<
}

if (this.drained && block && !this.limitUntil && !this.waiting) {
this.waiting = this.waitForJob(bclient);
this.waiting = this.waitForJob(bclient, this.blockUntil);
try {
const jobId = await this.waiting;
return this.moveToActive(client, token, jobId);
this.blockUntil = await this.waiting;

if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 0.1) {
return this.moveToActive(client, token);
}
} catch (err) {
// Swallow error if locally paused or closing since we did force a disconnection
if (
Expand Down Expand Up @@ -558,44 +558,31 @@ export class Worker<
protected async moveToActive(
client: RedisClient,
token: string,
jobId?: string,
): Promise<Job<DataType, ResultType, NameType>> {
// If we get the special delayed job ID, we pick the delay as the next
// block timeout.
if (jobId && jobId.startsWith('0:')) {
this.blockUntil = parseInt(jobId.split(':')[1]) || 0;

// Remove marker from active list.
await client.lrem(this.keys.active, 1, jobId);
if (this.blockUntil > 0) {
return;
}
}
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(client, token, jobId);
await this.scripts.moveToActive(client, token);
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, id, token);
}

private async waitForJob(bclient: RedisClient): Promise<string> {
private async waitForJob(
bclient: RedisClient,
blockUntil: number,
): Promise<number> {
if (this.paused) {
return;
return Infinity;
}

try {
const opts: WorkerOptions = <WorkerOptions>this.opts;

if (!this.closing) {
let blockTimeout = Math.max(
this.blockUntil
? (this.blockUntil - Date.now()) / 1000
: opts.drainDelay,
blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay,
0,
);

let jobId;

// Blocking for less than 50ms is useless.
if (blockTimeout > 0.05) {
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
Expand All @@ -607,16 +594,19 @@ export class Worker<
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);

jobId = await bclient.brpoplpush(
this.keys.wait,
this.keys.active,
blockTimeout,
);
} else {
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active);
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);

if (result) {
const [_key, member, score] = result;

if (member) {
return parseInt(score);
}
}
}
this.blockUntil = 0;
return jobId;
return 0;
}
} catch (error) {
if (isNotConnectionError(<Error>error)) {
Expand All @@ -628,6 +618,7 @@ export class Worker<
} finally {
this.waiting = null;
}
return Infinity;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
- Emits a global event 'delayed' if the job is delayed.
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'meta'
KEYS[4] 'id'
KEYS[5] 'delayed'
KEYS[6] 'completed'
KEYS[7] events stream key
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] 'completed'
KEYS[6] events stream key
ARGV[1] msgpacked arguments array
[1] key prefix,
Expand All @@ -34,15 +33,12 @@
jobId - OK
-5 - Missing parent key
]]
local waitKey = KEYS[1]
local pausedKey = KEYS[2]
local metaKey = KEYS[2]
local idKey = KEYS[3]
local delayedKey = KEYS[4]

local metaKey = KEYS[3]
local idKey = KEYS[4]
local delayedKey = KEYS[5]

local completedKey = KEYS[6]
local eventsKey = KEYS[7]
local completedKey = KEYS[5]
local eventsKey = KEYS[6]

local jobId
local jobIdKey
Expand All @@ -61,7 +57,7 @@ local parentData
-- Includes
--- @include "includes/storeJob"
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getTargetQueueList"
--- @include "includes/isQueuePaused"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/updateExistingJobsParent"

Expand Down Expand Up @@ -108,10 +104,12 @@ rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- If wait list is empty, and this delayed job is the next one to be processed,
-- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list.
local target = getTargetQueueList(metaKey, KEYS[1], KEYS[2])
addDelayMarkerIfNeeded(target, delayedKey)
-- mark that a delayed job is available
local isPaused = isQueuePaused(metaKey)
if not isPaused then
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
end

-- Check if this job is a child of another job, if so add it to the parents dependencies
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status.
Expand Down
Loading

0 comments on commit 73cf5fc

Please sign in to comment.