Skip to content

Commit

Permalink
feat(backoff): validate UnrecoverableError presence (#1074)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 15, 2022
1 parent d53a800 commit 1defeac
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 40 deletions.
60 changes: 39 additions & 21 deletions docs/gitbook/guide/retrying-failing-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

When a processor throws an exception, the worker will catch it and move the job to the failed set. But sometimes it may be desirable to retry a failed job.

BullMQ support retries of failed jobs using backoff functions. It is possible to use the built in backoff functions or provide custom ones.
BullMQ supports retries of failed jobs using backoff functions. It is possible to use the built in backoff functions or provide custom ones.

For BullMQ to reschedule failed jobs, make sure you create a `QueueScheduler` for your queue.

Expand Down Expand Up @@ -30,24 +30,21 @@ await queue.add(
You can also define it in the queue's `defaultJobOptions`, and it will apply to all jobs added to the queue, unless overridden. For example:

```typescript
import { Queue, QueueScheduler } from "bullmq";
import { Queue, QueueScheduler } from 'bullmq';

const myQueue = new Queue("foo", {
const myQueue = new Queue('foo', {
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000
}
}
type: 'exponential',
delay: 1000,
},
},
});

const myQueueScheduler = new QueueScheduler('foo');

await queue.add(
"test-retry",
{ foo: "bar" }
);
await queue.add('test-retry', { foo: 'bar' });
```

The current built-in backoff functions are "exponential" and "fixed".
Expand All @@ -59,19 +56,15 @@ If you want to define your custom backoff you need to define it at the worker:
```typescript
import { Worker } from 'bullmq';

const worker = new Worker(
'foo',
async job => doSomeProcessing(),
{
settings: {
backoffStrategies: {
custom(attemptsMade: number) {
return attemptsMade * 1000;
},
const worker = new Worker('foo', async job => doSomeProcessing(), {
settings: {
backoffStrategies: {
custom(attemptsMade: number) {
return attemptsMade * 1000;
},
},
},
);
});
```

You can then use your "custom" strategy when adding jobs:
Expand All @@ -93,3 +86,28 @@ await queue.add(
);
```

# Stop retrying jobs

When a processor throws an exception that is considered as unrecoverable, you should use the `UnrecoverableError` class.

BullMQ supports moving jobs to failed when this error is thrown without retrying to process it.

```typescript
import { Worker, UnrecoverableError } from 'bullmq';

const worker = new Worker('foo', async job => {doSomeProcessing();
throw new UnrecoverableError('Unrecoverable');
}, {
connection
},
});

await queue.add(
'test-retry',
{ foo: 'bar' },
{
attempts: 3,
backoff: 1000,
},
);
```
29 changes: 15 additions & 14 deletions src/classes/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
export * from './backoffs';
export * from './job';
export * from './queue-base';
export * from './queue-events';
export * from './queue-getters';
export * from './queue-scheduler';
export * from './queue';
export * from './redis-connection';
export * from './repeat';
export * from './scripts';
export * from './worker';
export * from './child-pool';
export * from './sandbox';
export * from './flow-producer';
export * from './backoffs';
export * from './job';
export * from './queue-base';
export * from './queue-events';
export * from './queue-getters';
export * from './queue-scheduler';
export * from './queue';
export * from './redis-connection';
export * from './repeat';
export * from './scripts';
export * from './worker';
export * from './child-pool';
export * from './sandbox';
export * from './flow-producer';
export * from './unrecoverable-error';
13 changes: 9 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Pipeline } from 'ioredis';
import { fromPairs } from 'lodash';
import { debuglog } from 'util';
import {
BackoffOptions,
Expand All @@ -20,7 +21,7 @@ import {
import { QueueEvents } from './queue-events';
import { Backoffs } from './backoffs';
import { MinimalQueue, ParentOpts, Scripts, JobData } from './scripts';
import { fromPairs } from 'lodash';
import { UnrecoverableError } from './unrecoverable-error';

const logger = debuglog('bull');

Expand Down Expand Up @@ -442,8 +443,8 @@ export class Job<
* @param fetchNext - true when wanting to fetch the next job
* @returns void
*/
async moveToFailed(
err: Error,
async moveToFailed<E extends Error>(
err: E,
token: string,
fetchNext = false,
): Promise<void> {
Expand All @@ -461,7 +462,11 @@ export class Job<
// Check if an automatic retry should be performed
//
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this.discarded) {
if (
this.attemptsMade < this.opts.attempts &&
!this.discarded &&
!(err instanceof UnrecoverableError)
) {
const opts = queue.opts as WorkerOptions;

// Check if backoff is needed
Expand Down
14 changes: 14 additions & 0 deletions src/classes/unrecoverable-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* UnrecoverableError
*
* Error to move a job to failed even if the attemptsMade
* are lower than the expected limit.
*
*/
export class UnrecoverableError extends Error {
constructor(message?: string) {
super(message);
this.name = this.constructor.name;
Object.setPrototypeOf(this, new.target.prototype);
}
}
21 changes: 21 additions & 0 deletions tests/fixtures/fixture_processor_unrecoverable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

const {
UnrecoverableError,
} = require('../../dist/cjs/classes/unrecoverable-error');
const delay = require('./delay');

module.exports = function (job) {
return delay(500).then(() => {
if (job.attemptsMade < 2) {
throw new Error('Not yet!');
}
if (job.attemptsMade < 3) {
throw new UnrecoverableError('Unrecoverable');
}
});
};
1 change: 1 addition & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ describe('flows', () => {
resolve =>
(childrenProcessor = async (job: Job) => {
processedChildren++;
await delay(10);

if (processedChildren == values.length) {
resolve();
Expand Down
47 changes: 47 additions & 0 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
Job,
Queue,
QueueEvents,
QueueScheduler,
Worker,
} from '../src/classes';
import { beforeEach } from 'mocha';
Expand Down Expand Up @@ -172,6 +173,52 @@ describe('sandboxed process', () => {
});
});

describe('when processor throws UnrecoverableError', () => {
it('moves job to failed', async function () {
this.timeout(6000);

const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const processFile =
__dirname + '/fixtures/fixture_processor_unrecoverable.js';

const worker = new Worker(queueName, processFile, {
connection,
drainDelay: 1,
});

await worker.waitUntilReady();

const start = Date.now();
await queue.add(
'test',
{ foo: 'bar' },
{
attempts: 3,
backoff: 1000,
},
);

await new Promise<void>(resolve => {
worker.on(
'failed',
after(2, (job: Job, error) => {
const elapse = Date.now() - start;
expect(error.name).to.be.eql('UnrecoverableError');
expect(error.message).to.be.eql('Unrecoverable');
expect(elapse).to.be.greaterThan(1000);
expect(job.attemptsMade).to.be.eql(2);
resolve();
}),
);
});

await worker.close();
await queueScheduler.close();
});
});

it('should process with named processor', async () => {
const processFile = __dirname + '/fixtures/fixture_processor.js';
const worker = new Worker(queueName, processFile, {
Expand Down
54 changes: 53 additions & 1 deletion tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import { v4 } from 'uuid';
import {
Queue,
QueueEvents,
QueueScheduler,
Job,
UnrecoverableError,
Worker,
QueueScheduler,
} from '../src/classes';
import { KeepJobs, JobsOptions } from '../src/interfaces';

Expand Down Expand Up @@ -1697,6 +1698,57 @@ describe('workers', function () {
await queueScheduler.close();
});

describe('when UnrecoverableError is throw', () => {
it('moves job to failed', async function () {
this.timeout(8000);

const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade < 2) {
throw new Error('Not yet!');
}
if (job.attemptsMade < 3) {
throw new UnrecoverableError('Unrecoverable');
}
},
{ connection },
);

await worker.waitUntilReady();

const start = Date.now();
await queue.add(
'test',
{ foo: 'bar' },
{
attempts: 3,
backoff: 1000,
},
);

await new Promise<void>(resolve => {
worker.on(
'failed',
after(2, (job: Job, error) => {
const elapse = Date.now() - start;
expect(error.name).to.be.eql('UnrecoverableError');
expect(error.message).to.be.eql('Unrecoverable');
expect(elapse).to.be.greaterThan(1000);
expect(job.attemptsMade).to.be.eql(2);
resolve();
}),
);
});

await worker.close();
await queueScheduler.close();
});
});

describe('when providing a way to execute step jobs', () => {
it('should retry a job after a delay if a fixed backoff is given, keeping the current step', async function () {
this.timeout(8000);
Expand Down

0 comments on commit 1defeac

Please sign in to comment.