diff --git a/python/bullmq/job.py b/python/bullmq/job.py index 5728388a60..bb7bbe78ad 100644 --- a/python/bullmq/job.py +++ b/python/bullmq/job.py @@ -14,6 +14,7 @@ optsDecodeMap = { 'fpof': 'failParentOnFailure', + 'idof': 'ignoreDependencyOnFailure', 'kl': 'keepLogs', } diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 0333f89961..9dd1af7e47 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -512,11 +512,6 @@ def getMetricsSize(opts: dict): return metrics.get("maxDataPoints", "") return "" - def getFailParentOnFailure(job: Job): - opts = job.opts - if opts is not None: - return opts.get("failParentOnFailure", False) - keepJobs = getKeepJobs(shouldRemove) packedOpts = msgpack.packb({ @@ -527,7 +522,8 @@ def getFailParentOnFailure(job: Job): "attempts": job.attempts, "attemptsMade": job.attemptsMade, "maxMetricsSize": getMetricsSize(opts), - "fpof": getFailParentOnFailure(job), + "fpof": opts.get("failParentOnFailure", False), + "idof": opts.get("ignoreDependencyOnFailure", False) }, use_bin_type=True) args = [job.id, timestamp, propVal, transformed_value or "", target, diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 6e25a752ce..df87ca5820 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -28,6 +28,7 @@ local rcall = redis.call --- @include "includes/batches" --- @include "includes/getTargetQueueList" --- @include "includes/moveParentFromWaitingChildrenToFailed" +--- @include "includes/moveParentToWaitIfNeeded" --- @include "includes/removeJob" --- @include "includes/removeJobsByMaxAge" --- @include "includes/removeJobsByMaxCount" @@ -105,6 +106,16 @@ if (#stalling > 0) then jobKey, timestamp ) + elseif opts['idof'] then + local parentData = cjson.decode(rawParentData) + local parentKey = parentData['queueKey'] .. ':' .. parentData['id'] + local dependenciesSet = parentKey .. ":dependencies" + if rcall("SREM", dependenciesSet, jobKey) == 1 then + moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet, + parentKey, parentData['id'], timestamp) + local failedSet = parentKey .. ":failed" + rcall("HSET", failedSet, jobKey, failedReason) + end end if removeOnFailType == "number" then removeJobsByMaxCount(opts["removeOnFail"], diff --git a/tests/test_stalled_jobs.ts b/tests/test_stalled_jobs.ts index 68b9a5fd79..c040496ef0 100644 --- a/tests/test_stalled_jobs.ts +++ b/tests/test_stalled_jobs.ts @@ -467,6 +467,95 @@ describe('stalled jobs', function () { }); }); + describe('when ignoreDependencyOnFailure is provided as true', function () { + it('should move parent to waiting when child is moved to failed', async function () { + this.timeout(6000); + const concurrency = 4; + const parentQueueName = `parent-queue-${v4()}`; + + const parentQueue = new Queue(parentQueueName, { + connection, + prefix, + }); + + const flow = new FlowProducer({ connection, prefix }); + + const worker = new Worker( + queueName, + async () => { + return delay(10000); + }, + { + connection, + prefix, + lockDuration: 1000, + stalledInterval: 100, + maxStalledCount: 0, + concurrency, + }, + ); + + const allActive = new Promise(resolve => { + worker.on('active', after(concurrency, resolve)); + }); + + await worker.waitUntilReady(); + + const { job: parent } = await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { + name: 'test', + data: { foo: 'bar' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + }, + ], + }); + + const jobs = Array.from(Array(3).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue.addBulk(jobs); + await allActive; + await worker.close(true); + + const worker2 = new Worker(queueName, async job => {}, { + connection, + prefix, + stalledInterval: 100, + maxStalledCount: 0, + concurrency, + }); + + const errorMessage = 'job stalled more than allowable limit'; + const allFailed = new Promise(resolve => { + worker2.on( + 'failed', + after(concurrency, async (job, failedReason, prev) => { + const parentState = await parent.getState(); + + expect(parentState).to.be.equal('waiting'); + expect(prev).to.be.equal('active'); + expect(failedReason.message).to.be.equal(errorMessage); + resolve(); + }), + ); + }); + + await allFailed; + + await worker2.close(); + await parentQueue.close(); + await flow.close(); + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + }); + }); + describe('when removeOnFail is provided as a number', function () { it('keeps the specified number of jobs in failed', async function () { this.timeout(6000);