Skip to content

Commit

Permalink
feat(core): Improve cancellation mechanism of DefaultJobQueuePlugin
Browse files Browse the repository at this point in the history
Relates to #1127, relates to #2650. This commit alters slightly the way cancellation
is handled: when a job is cancelled, it's status is updated but the
job will continue to run unless the `process` function explicitly
throws an error.
  • Loading branch information
michaelbromley committed Feb 2, 2024
1 parent 350d25f commit cba069b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
30 changes: 18 additions & 12 deletions packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { JobState } from '@vendure/common/lib/generated-types';
import { ID } from '@vendure/common/lib/shared-types';
import { isObject } from '@vendure/common/lib/shared-utils';
import { from, interval, race, Subject, Subscription } from 'rxjs';
import { from, interval, mergeMap, race, Subject, Subscription } from 'rxjs';
import { filter, switchMap, take, throttleTime } from 'rxjs/operators';

import { Logger } from '../config/logger/vendure-logger';
Expand Down Expand Up @@ -70,10 +70,6 @@ class ActiveQueue<Data extends JobData<Data> = object> {
private readonly process: (job: Job<Data>) => Promise<any>,
private readonly jobQueueStrategy: PollingJobQueueStrategy,
) {
this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
Logger.error(message);
Logger.debug(stack);
});
this.pollInterval =
typeof this.jobQueueStrategy.pollInterval === 'function'
? this.jobQueueStrategy.pollInterval(queueName)
Expand All @@ -82,6 +78,10 @@ class ActiveQueue<Data extends JobData<Data> = object> {

start() {
Logger.debug(`Starting JobQueue "${this.queueName}"`);
this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
Logger.error(message);
Logger.debug(stack);
});
this.running = true;
const runNextJobs = async () => {
try {
Expand All @@ -93,15 +93,19 @@ class ActiveQueue<Data extends JobData<Data> = object> {
await this.jobQueueStrategy.update(nextJob);
const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
nextJob.on('progress', onProgress);
const cancellationSignal$ = interval(this.pollInterval * 5).pipe(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
filter(job => job?.state === JobState.CANCELLED),
take(1),
);
const cancellationSub = interval(this.pollInterval * 5)
.pipe(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
filter(job => job?.state === JobState.CANCELLED),
take(1),
)
.subscribe(() => {
nextJob.cancel();
});
const stopSignal$ = this.queueStopped$.pipe(take(1));

race(from(this.process(nextJob)), cancellationSignal$, stopSignal$)
race(from(this.process(nextJob)), stopSignal$)
.toPromise()
.then(
result => {
Expand All @@ -122,6 +126,7 @@ class ActiveQueue<Data extends JobData<Data> = object> {
// return;
// }
nextJob.off('progress', onProgress);
cancellationSub.unsubscribe();
return this.onFailOrComplete(nextJob);
})
.catch((err: any) => {
Expand Down Expand Up @@ -150,6 +155,7 @@ class ActiveQueue<Data extends JobData<Data> = object> {
clearTimeout(this.timer);
await this.awaitRunningJobsOrTimeout();
Logger.info(`Stopped queue: ${this.queueName}`);
this.subscription.unsubscribe();
// Allow any job status changes to be persisted
// before we permit the application shutdown to continue.
// Otherwise, the DB connection will close before our
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
.update()
.set(this.toRecord(job))
.where('id = :id', { id: job.id })
.andWhere('settledAt IS NULL')
.execute();
}

Expand Down

0 comments on commit cba069b

Please sign in to comment.