diff --git a/packages/core/src/job-queue/polling-job-queue-strategy.ts b/packages/core/src/job-queue/polling-job-queue-strategy.ts index baadea8f05..7361d147cd 100644 --- a/packages/core/src/job-queue/polling-job-queue-strategy.ts +++ b/packages/core/src/job-queue/polling-job-queue-strategy.ts @@ -51,6 +51,18 @@ export interface PollingJobQueueStrategyConfig { * @default () => 1000 */ backoffStrategy?: BackoffStrategy; + /** + * @description + * The timeout in ms which the queue will use when attempting a graceful shutdown. + * That means, when the server is shut down but a job is running, the job queue will + * wait for the job to complete before allowing the server to shut down. If the job + * does not complete within this timeout window, the job will be forced to stop + * and the server will shut down anyway. + * + * @since 2.2.0 + * @default 20_000 + */ + gracefulShutdownTimeout?: number; } const STOP_SIGNAL = Symbol('STOP_SIGNAL'); @@ -232,6 +244,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy public pollInterval: number | ((queueName: string) => number); public setRetries: (queueName: string, job: Job) => number; public backOffStrategy?: BackoffStrategy; + public gracefulShutdownTimeout: number; protected activeQueues = new QueueNameProcessStorage>(); @@ -245,10 +258,12 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy this.pollInterval = concurrencyOrConfig.pollInterval ?? 200; this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000); this.setRetries = concurrencyOrConfig.setRetries ?? ((_, job) => job.retries); + this.gracefulShutdownTimeout = concurrencyOrConfig.gracefulShutdownTimeout ?? 20_000; } else { this.concurrency = concurrencyOrConfig ?? 1; this.pollInterval = maybePollInterval ?? 200; this.setRetries = (_, job) => job.retries; + this.gracefulShutdownTimeout = 20_000; } } @@ -276,7 +291,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy if (!active) { return; } - await active.stop(); + await active.stop(this.gracefulShutdownTimeout); } async cancelJob(jobId: ID): Promise { diff --git a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts index e6b5f93ebf..d731add608 100644 --- a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts @@ -76,6 +76,18 @@ export interface DefaultJobQueueOptions { * @since 1.3.0 */ useDatabaseForBuffer?: boolean; + /** + * @description + * The timeout in ms which the queue will use when attempting a graceful shutdown. + * That means when the server is shut down but a job is running, the job queue will + * wait for the job to complete before allowing the server to shut down. If the job + * does not complete within this timeout window, the job will be forced to stop + * and the server will shut down anyway. + * + * @since 2.2.0 + * @default 20_000 + */ + gracefulShutdownTimeout?: number; } /** @@ -175,13 +187,14 @@ export interface DefaultJobQueueOptions { ? [JobRecord, JobRecordBuffer] : [JobRecord], configuration: config => { - const { pollInterval, concurrency, backoffStrategy, setRetries } = + const { pollInterval, concurrency, backoffStrategy, setRetries, gracefulShutdownTimeout } = DefaultJobQueuePlugin.options ?? {}; config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({ concurrency, pollInterval, backoffStrategy, setRetries, + gracefulShutdownTimeout, }); if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) { config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();