Skip to content

Commit

Permalink
fix(core): Improve handling of active jobs on worker shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Feb 2, 2024
1 parent 754da02 commit e1e0987
Showing 1 changed file with 48 additions and 15 deletions.
63 changes: 48 additions & 15 deletions packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ class ActiveQueue<Data extends JobData<Data> = object> {
},
)
.finally(() => {
if (!this.running && nextJob.state !== JobState.PENDING) {
return;
}
// if (!this.running && nextJob.state !== JobState.PENDING) {
// return;
// }
nextJob.off('progress', onProgress);
return this.onFailOrComplete(nextJob);
})
Expand All @@ -145,24 +145,55 @@ class ActiveQueue<Data extends JobData<Data> = object> {
void runNextJobs();
}

stop(): Promise<void> {
async stop(): Promise<void> {
this.running = false;
this.queueStopped$.next(STOP_SIGNAL);
clearTimeout(this.timer);
await this.awaitRunningJobsOrTimeout();
Logger.info(`Stopped queue: ${this.queueName}`);
// Allow any job status changes to be persisted
// before we permit the application shutdown to continue.
// Otherwise, the DB connection will close before our
// changes are persisted.
await new Promise(resolve => setTimeout(resolve, 1000));
}

private awaitRunningJobsOrTimeout(): Promise<void> {
const start = +new Date();
// Wait for 2 seconds to allow running jobs to complete
const maxTimeout = 2000;
let pollTimer: any;
const stopActiveQueueTimeout = 20_000;
let timeout: ReturnType<typeof setTimeout>;
return new Promise(resolve => {
const pollActiveJobs = async () => {
const timedOut = +new Date() - start > maxTimeout;
if (this.activeJobs.length === 0 || timedOut) {
clearTimeout(pollTimer);
let lastStatusUpdate = +new Date();
const pollActiveJobs = () => {
const now = +new Date();
const timedOut =
stopActiveQueueTimeout === undefined ? false : now - start > stopActiveQueueTimeout;

if (this.activeJobs.length === 0) {
clearTimeout(timeout);
resolve();
return;
}

if (timedOut) {
Logger.warn(
`Timed out (${stopActiveQueueTimeout}ms) waiting for ${this.activeJobs.length} active jobs in queue "${this.queueName}" to complete. Forcing stop...`,
);
this.queueStopped$.next(STOP_SIGNAL);
clearTimeout(timeout);
resolve();
} else {
pollTimer = setTimeout(pollActiveJobs, 50);
return;
}

if (this.activeJobs.length > 0) {
if (now - lastStatusUpdate > 2000) {
Logger.info(
`Stopping queue: ${this.queueName} - waiting for ${this.activeJobs.length} active jobs to complete...`,
);
lastStatusUpdate = now;
}
}

timeout = setTimeout(pollActiveJobs, 200);
};
void pollActiveJobs();
});
Expand All @@ -175,7 +206,9 @@ class ActiveQueue<Data extends JobData<Data> = object> {

private removeJobFromActive(job: Job<Data>) {
const index = this.activeJobs.indexOf(job);
this.activeJobs.splice(index, 1);
if (index !== -1) {
this.activeJobs.splice(index, 1);
}
}
}

Expand Down

0 comments on commit e1e0987

Please sign in to comment.