Skip to content

Commit

Permalink
feat: allow arbitrary large drainDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Apr 9, 2024
1 parent 111fa41 commit 9693321
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,6 @@ export class Worker<
? blockTimeout
: Math.ceil(blockTimeout);

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);

// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
Expand Down Expand Up @@ -652,19 +647,27 @@ export class Worker<

protected getBlockTimeout(blockUntil: number): number {
const opts: WorkerOptions = <WorkerOptions>this.opts;
let blockTimeout;

// when there are delayed jobs
if (blockUntil) {
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < 1) {
return minimumBlockTimeout;
blockTimeout = minimumBlockTimeout;
} else {
return blockDelay / 1000;
blockTimeout = blockDelay / 1000;
}

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
} else {
return Math.max(opts.drainDelay, minimumBlockTimeout);
blockTimeout = Math.max(opts.drainDelay, minimumBlockTimeout);
}

return blockTimeout;
}

/**
Expand Down

0 comments on commit 9693321

Please sign in to comment.