Skip to content

Commit

Permalink
fix(worker): validate drainDelay must be greater than 0 (#2477)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Mar 17, 2024
1 parent 928eeb6 commit ab43693
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {

// 10 seconds is the maximum time a BRPOPLPUSH can block.
const maximumBlockTimeout = 10;
const minimumBlockTimeout = 0.001;

// note: sandboxed processors would also like to define concurrency per process
// for better resource utilization.
Expand Down Expand Up @@ -225,6 +226,10 @@ export class Worker<
throw new Error('stalledInterval must be greater than 0');
}

if (this.opts.drainDelay <= 0) {
throw new Error('drainDelay must be greater than 0');
}

this.concurrency = this.opts.concurrency;

this.opts.lockRenewTime =
Expand Down Expand Up @@ -642,12 +647,12 @@ export class Worker<
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < 1) {
return 0.001;
return minimumBlockTimeout;
} else {
return blockDelay / 1000;
}
} else {
return Math.max(opts.drainDelay, 0);
return Math.max(opts.drainDelay, minimumBlockTimeout);
}
}

Expand Down
14 changes: 13 additions & 1 deletion tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ describe('workers', function () {
});

it('stalled interval cannot be zero', function () {
this.timeout(8000);
this.timeout(4000);
expect(
() =>
new Worker(queueName, async () => {}, {
Expand All @@ -1915,6 +1915,18 @@ describe('workers', function () {
).to.throw('stalledInterval must be greater than 0');
});

it('drain delay cannot be zero', function () {
this.timeout(4000);
expect(
() =>
new Worker(queueName, async () => {}, {
connection,
prefix,
drainDelay: 0,
}),
).to.throw('drainDelay must be greater than 0');
});

it('lock extender continues to run until all active jobs are completed when closing a worker', async function () {
this.timeout(4000);
let worker;
Expand Down

0 comments on commit ab43693

Please sign in to comment.