Skip to content

Commit

Permalink
feat(core): Add gracefulShutdownTimeout to DefaultJobQueuePlugin
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Feb 6, 2024
1 parent 1b2b139 commit cba06e0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
17 changes: 16 additions & 1 deletion packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export interface PollingJobQueueStrategyConfig {
* @default () => 1000
*/
backoffStrategy?: BackoffStrategy;
/**
* @description
* The timeout in ms which the queue will use when attempting a graceful shutdown.
* That means, when the server is shut down but a job is running, the job queue will
* wait for the job to complete before allowing the server to shut down. If the job
* does not complete within this timeout window, the job will be forced to stop
* and the server will shut down anyway.
*
* @since 2.2.0
* @default 20_000
*/
gracefulShutdownTimeout?: number;
}

const STOP_SIGNAL = Symbol('STOP_SIGNAL');
Expand Down Expand Up @@ -232,6 +244,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
public pollInterval: number | ((queueName: string) => number);
public setRetries: (queueName: string, job: Job) => number;
public backOffStrategy?: BackoffStrategy;
public gracefulShutdownTimeout: number;

protected activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();

Expand All @@ -245,10 +258,12 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000);
this.setRetries = concurrencyOrConfig.setRetries ?? ((_, job) => job.retries);
this.gracefulShutdownTimeout = concurrencyOrConfig.gracefulShutdownTimeout ?? 20_000;
} else {
this.concurrency = concurrencyOrConfig ?? 1;
this.pollInterval = maybePollInterval ?? 200;
this.setRetries = (_, job) => job.retries;
this.gracefulShutdownTimeout = 20_000;
}
}

Expand Down Expand Up @@ -276,7 +291,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
if (!active) {
return;
}
await active.stop();
await active.stop(this.gracefulShutdownTimeout);
}

async cancelJob(jobId: ID): Promise<Job | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ export interface DefaultJobQueueOptions {
* @since 1.3.0
*/
useDatabaseForBuffer?: boolean;
/**
* @description
* The timeout in ms which the queue will use when attempting a graceful shutdown.
* That means when the server is shut down but a job is running, the job queue will
* wait for the job to complete before allowing the server to shut down. If the job
* does not complete within this timeout window, the job will be forced to stop
* and the server will shut down anyway.
*
* @since 2.2.0
* @default 20_000
*/
gracefulShutdownTimeout?: number;
}

/**
Expand Down Expand Up @@ -175,13 +187,14 @@ export interface DefaultJobQueueOptions {
? [JobRecord, JobRecordBuffer]
: [JobRecord],
configuration: config => {
const { pollInterval, concurrency, backoffStrategy, setRetries } =
const { pollInterval, concurrency, backoffStrategy, setRetries, gracefulShutdownTimeout } =
DefaultJobQueuePlugin.options ?? {};
config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({
concurrency,
pollInterval,
backoffStrategy,
setRetries,
gracefulShutdownTimeout,
});
if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) {
config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();
Expand Down

0 comments on commit cba06e0

Please sign in to comment.