Skip to content

Commit

Permalink
feat: add move to waiting children for manual processing (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored May 31, 2021
1 parent 499bb93 commit f312f29
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 2 deletions.
21 changes: 21 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ export interface JobJsonRaw {
parentKey?: string;
}

export interface MoveToChildrenOpts {
timestamp?: number;
child?: {
id: string;
queue: string;
};
}

export class Job<T = any, R = any, N extends string = string> {
/**
* The progress a job has performed so far.
Expand Down Expand Up @@ -628,6 +636,19 @@ export class Job<T = any, R = any, N extends string = string> {
return Scripts.moveToDelayed(this.queue, this.id, timestamp);
}

/**
* Moves the job to the waiting-children set.
* @param {string} token Token to check job is locked by current worker
* @param opts the options bag for moving a job to waiting-children.
* @returns {boolean} true if the job was moved
*/
moveToWaitingChildren(
token: string,
opts: MoveToChildrenOpts,
): Promise<boolean | Error> {
return Scripts.moveToWaitingChildren(this.queue, this.id, token, opts);
}

/**
* Promotes a delayed job so that it starts to be processed as soon as possible.
*/
Expand Down
53 changes: 53 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import {
WorkerOptions,
} from '../interfaces';
import { array2obj } from '../utils';
import { MoveToChildrenOpts } from './job';
import { Worker } from './worker';
import { QueueScheduler } from './queue-scheduler';
import { QueueBase } from './queue-base';
import { Job, JobJson, JobJsonRaw } from './job';
import { getParentKey } from './flow-producer';
import { RedisClient } from './redis-connection';

export type MinimalQueue = Pick<
Expand Down Expand Up @@ -344,6 +346,34 @@ export class Scripts {
return keys.concat([JSON.stringify(timestamp), jobId]);
}

static moveToWaitingChildrenArgs(
queue: MinimalQueue,
jobId: string,
token: string,
opts?: MoveToChildrenOpts,
) {
let timestamp = Math.max(0, opts.timestamp ?? 0);

const childKey = getParentKey(opts.child);

if (timestamp > 0) {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
}

const keys = [`${jobId}:lock`, 'active', 'waiting-children', jobId].map(
function(name) {
return queue.toKey(name);
},
);

return keys.concat([
token,
childKey ?? '',
JSON.stringify(timestamp),
jobId,
]);
}

static async moveToDelayed(
queue: MinimalQueue,
jobId: string,
Expand All @@ -358,6 +388,29 @@ export class Scripts {
}
}

static async moveToWaitingChildren(
queue: MinimalQueue,
jobId: string,
token: string,
opts: MoveToChildrenOpts = {},
) {
const client = await queue.client;
const multi = client.multi();

const args = this.moveToWaitingChildrenArgs(queue, jobId, token, opts);
(<any>multi).moveToWaitingChildren(args);
const [[err, result]] = (await multi.exec()) as [[null | Error, number]];

switch (result) {
case 0:
return true;
case 1:
return false;
default:
return this.finishedErrors(result, jobId, 'moveToWaitingChildren');
}
}

static async cleanJobsInSet(
queue: MinimalQueue,
set: string,
Expand Down
60 changes: 60 additions & 0 deletions src/commands/moveToWaitingChildren-4.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
--[[
Moves job from active to waiting children set.
Input:
KEYS[1] lock key
KEYS[2] active key
KEYS[3] waitChildrenKey key
KEYS[4] job key
ARGV[1] token
ARGV[2] child key
ARGV[3] timestamp
ARGV[4] the id of the job
Output:
0 - OK
1 - There are not pending dependencies.
-1 - Missing job.
-2 - Missing lock
-3 - Job not in active set
]]
local rcall = redis.call

local function move_to_waiting_children (activeKey, waitingChildrenKey, jobId, timestamp)
local score = tonumber(timestamp)

local numRemovedElements = rcall("LREM", activeKey, -1, jobId)

if(numRemovedElements < 1) then
return -3
end

rcall("ZADD", waitingChildrenKey, score, jobId)

return 0
end

if ARGV[1] ~= "0" then
if rcall("GET", KEYS[1]) ~= ARGV[1] then
return -2
end
end

if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
end
end

return -1
8 changes: 6 additions & 2 deletions src/test/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ describe('flows', () => {
const processingParent = new Promise<void>((resolve, reject) => [
(parentProcessor = async (job: Job) => {
try {
expect(processedChildren).to.be.equal(3);
const { processed } = await job.getDependencies();
expect(Object.keys(processed)).to.have.length(3);

const childrenValues = await job.getChildrenValues();

Expand All @@ -65,6 +66,8 @@ describe('flows', () => {

const parentWorker = new Worker(parentQueueName, parentProcessor);
const childrenWorker = new Worker(queueName, childrenProcessor);
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

const flow = new FlowProducer();
const tree = await flow.add({
Expand Down Expand Up @@ -279,7 +282,8 @@ describe('flows', () => {
const processingTop = new Promise<void>((resolve, reject) => [
(parentProcessor = async (job: Job) => {
try {
expect(processedChildren).to.be.equal(3);
const { processed } = await job.getDependencies();
expect(Object.keys(processed)).to.have.length(1);

const childrenValues = await job.getChildrenValues();

Expand Down
116 changes: 116 additions & 0 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,122 @@ describe('workers', function() {
});
});

it('should allow to move parent job to waiting-children', async () => {
const values = [
{ idx: 0, bar: 'something' },
{ idx: 1, baz: 'something' },
{ idx: 2, qux: 'something' },
];
const parentToken = 'parent-token';
const childToken = 'child-token';

const parentQueueName = 'parent-queue';

const parentQueue = new Queue(parentQueueName);
const parentWorker = new Worker(parentQueueName);
const childrenWorker = new Worker(queueName);

const data = { foo: 'bar' };
await Job.create(parentQueue, 'testDepend', data);
const parent = (await parentWorker.getNextJob(parentToken)) as Job;
const currentState = await parent.getState();

expect(currentState).to.be.equal('active');

await Job.create(queue, 'testJob1', values[0], {
parent: {
id: parent.id,
queue: 'bull:' + parentQueueName,
},
});
await Job.create(queue, 'testJob2', values[1], {
parent: {
id: parent.id,
queue: 'bull:' + parentQueueName,
},
});
await Job.create(queue, 'testJob3', values[2], {
parent: {
id: parent.id,
queue: 'bull:' + parentQueueName,
},
});
const { unprocessed: unprocessed1 } = await parent.getDependencies();

expect(unprocessed1).to.have.length(3);

const child1 = (await childrenWorker.getNextJob(childToken)) as Job;
const child2 = (await childrenWorker.getNextJob(childToken)) as Job;
const child3 = (await childrenWorker.getNextJob(childToken)) as Job;
const isActive1 = await child1.isActive();

expect(isActive1).to.be.true;

await child1.moveToCompleted('return value1', childToken);
const {
processed: processed2,
unprocessed: unprocessed2,
} = await parent.getDependencies();
const movedToWaitingChildren = await parent.moveToWaitingChildren(
parentToken,
{
child: {
id: child3.id,
queue: 'bull:' + queueName,
},
},
);

expect(processed2).to.deep.equal({
[`bull:${queueName}:${child1.id}`]: `"return value1"`,
});
expect(unprocessed2).to.have.length(2);
expect(movedToWaitingChildren).to.be.true;

const isActive2 = await child2.isActive();

expect(isActive2).to.be.true;

await child2.moveToCompleted('return value2', childToken);
const {
processed: processed3,
unprocessed: unprocessed3,
} = await parent.getDependencies();
const isWaitingChildren1 = await parent.isWaitingChildren();

expect(processed3).to.deep.equal({
[`bull:${queueName}:${child1.id}`]: `"return value1"`,
[`bull:${queueName}:${child2.id}`]: `"return value2"`,
});
expect(unprocessed3).to.have.length(1);
expect(isWaitingChildren1).to.be.true;

const isActive3 = await child3.isActive();

expect(isActive3).to.be.true;

await child3.moveToCompleted('return value3', childToken);
const {
processed: processed4,
unprocessed: unprocessed4,
} = await parent.getDependencies();
const isWaitingChildren2 = await parent.isWaitingChildren();

expect(processed4).to.deep.equal({
[`bull:${queueName}:${child1.id}`]: `"return value1"`,
[`bull:${queueName}:${child2.id}`]: `"return value2"`,
[`bull:${queueName}:${child3.id}`]: `"return value3"`,
});
expect(unprocessed4).to.have.length(0);
expect(isWaitingChildren2).to.be.false;

await childrenWorker.close();
await parentWorker.close();

await parentQueue.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});

it('should allow to fail jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';
Expand Down

0 comments on commit f312f29

Please sign in to comment.