Skip to content

Commit

Permalink
feat(job): add removeChildDependency method (#2435)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 27, 2024
1 parent ada23fa commit 1151022
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 23 deletions.
1 change: 1 addition & 0 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [Fail Parent](guide/flows/fail-parent.md)
* [Remove Dependency](guide/flows/remove-dependency.md)
* [Ignore Dependency](guide/flows/ignore-dependency.md)
* [Remove Child Dependency](guide/flows/remove-child-dependency.md)
* [Metrics](guide/metrics/metrics.md)
* [Rate limiting](guide/rate-limiting.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
Expand Down
31 changes: 31 additions & 0 deletions docs/gitbook/guide/flows/remove-child-dependency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Remove Child Dependency

In some situations, you may have a parent job and need to remove the dependency of one of its children.

The pattern to solve this requirement consists on using the **removeChildDependency** method. It will make sure that if the job is the last pending child, to move its parent to _waiting_ and it won't be listed in unprocessed list of the parent.

```typescript
const flow = new FlowProducer({ connection });

const originalTree = await flow.add({
name: 'root-job',
queueName: 'topQueueName',
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: {},
},
],
});

await originalTree.children[0].job.removeChildDependency();
```

{% hint style="waring" %}
As soon as a **child** calls this method, it will verify if it has an existing parent, if not, it'll throw an error.
{% endhint %}

Failed or completed children using this option won't generate any removal as they won't be part of unprocessed list:
26 changes: 25 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,25 @@ export class Job<
return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
}

/**
* Removes child dependency from parent when child is not yet finished
*
* @returns True if the relationship existed and if it was removed.
*/
async removeChildDependency(): Promise<boolean> {
const childDependencyIsRemoved = await this.scripts.removeChildDependency(
this.id,
this.parentKey,
);
if (childDependencyIsRemoved) {
this.parent = undefined;
this.parentKey = undefined;
return true;
}

return false;
}

/**
* Clears job's logs
*
Expand Down Expand Up @@ -704,7 +723,12 @@ export class Job<

const code = results[results.length - 1][1] as number;
if (code < 0) {
throw this.scripts.finishedErrors(code, this.id, command, 'active');
throw this.scripts.finishedErrors({
code,
jobId: this.id,
command,
state: 'active',
});
}

if (finishedOn && typeof finishedOn === 'number') {
Expand Down
129 changes: 107 additions & 22 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ export class Scripts {
}

if (<number>result < 0) {
throw this.finishedErrors(<number>result, parentOpts.parentKey, 'addJob');
throw this.finishedErrors({
code: <number>result,
parentKey: parentOpts.parentKey,
command: 'addJob',
});
}

return <string>result;
Expand Down Expand Up @@ -314,7 +318,11 @@ export class Scripts {
const result = await (<any>client).updateData(keys.concat([dataJson]));

if (result < 0) {
throw this.finishedErrors(result, job.id, 'updateData');
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'updateData',
});
}
}

Expand All @@ -336,7 +344,11 @@ export class Scripts {
);

if (result < 0) {
throw this.finishedErrors(result, jobId, 'updateProgress');
throw this.finishedErrors({
code: result,
jobId,
command: 'updateProgress',
});
}
}

Expand Down Expand Up @@ -414,20 +426,32 @@ export class Scripts {

const result = await (<any>client).moveToFinished(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToFinished', 'active');
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToFinished',
state: 'active',
});
} else {
if (typeof result !== 'undefined') {
return raw2NextJobData(result);
}
}
}

finishedErrors(
code: number,
jobId: string,
command: string,
state?: string,
): Error {
finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
Expand All @@ -440,14 +464,14 @@ export class Scripts {
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${jobId}. ${command}`);
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${jobId} cannot be replaced. ${command}`,
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
Expand Down Expand Up @@ -476,6 +500,43 @@ export class Scripts {
return (<any>client).drain(args);
}

private removeChildDependencyArgs(
jobId: string,
parentKey: string,
): (string | number)[] {
const queueKeys = this.queue.keys;

const keys: string[] = [queueKeys['']];

const args = [this.queue.toKey(jobId), parentKey];

return keys.concat(args);
}

async removeChildDependency(
jobId: string,
parentKey: string,
): Promise<boolean> {
const client = await this.queue.client;
const args = this.removeChildDependencyArgs(jobId, parentKey);

const result = await (<any>client).removeChildDependency(args);

switch (result) {
case 0:
return true;
case 1:
return false;
default:
throw this.finishedErrors({
code: result,
jobId,
parentKey,
command: 'removeChildDependency',
});
}
}

private getRangesArgs(
types: JobType[],
start: number,
Expand Down Expand Up @@ -609,7 +670,12 @@ export class Scripts {
const args = this.changeDelayArgs(jobId, delay);
const result = await (<any>client).changeDelay(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed');
throw this.finishedErrors({
code: result,
jobId,
command: 'changeDelay',
state: 'delayed',
});
}
}

Expand Down Expand Up @@ -652,7 +718,11 @@ export class Scripts {
const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changePriority');
throw this.finishedErrors({
code: result,
jobId,
command: 'changePriority',
});
}
}

Expand Down Expand Up @@ -766,7 +836,12 @@ export class Scripts {
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToDelayed',
state: 'active',
});
}
}

Expand Down Expand Up @@ -797,12 +872,12 @@ export class Scripts {
case 1:
return false;
default:
throw this.finishedErrors(
result,
throw this.finishedErrors({
code: result,
jobId,
'moveToWaitingChildren',
'active',
);
command: 'moveToWaitingChildren',
state: 'active',
});
}
}

Expand Down Expand Up @@ -939,7 +1014,12 @@ export class Scripts {
case 1:
return;
default:
throw this.finishedErrors(result, job.id, 'reprocessJob', state);
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'reprocessJob',
state,
});
}
}

Expand Down Expand Up @@ -997,7 +1077,12 @@ export class Scripts {

const code = await (<any>client).promote(keys.concat(args));
if (code < 0) {
throw this.finishedErrors(code, jobId, 'promote', 'delayed');
throw this.finishedErrors({
code,
jobId,
command: 'promote',
state: 'delayed',
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
else
local missedParentKey = rcall("HGET", jobKey, "parentKey")
Expand Down Expand Up @@ -74,7 +75,9 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
end
end
return false
end
34 changes: 34 additions & 0 deletions src/commands/removeChildDependency-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--[[
Break parent-child dependency by removing
child reference from parent
Input:
KEYS[1] 'key' prefix,
ARGV[1] job key
ARGV[2] parent key
Output:
0 - OK
1 - There is not relationship.
-1 - Missing job key
-5 - Missing parent key
]]
local rcall = redis.call
local jobKey = ARGV[1]
local parentKey = ARGV[2]

-- Includes
--- @include "includes/removeParentDependencyKey"

if rcall("EXISTS", jobKey) ~= 1 then return -1 end

if rcall("EXISTS", parentKey) ~= 1 then return -5 end

if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then
rcall("HDEL", jobKey, "parentKey", "parent")

return 0
else
return 1
end
Loading

0 comments on commit 1151022

Please sign in to comment.