Skip to content

Commit 424a68e

Browse files
committed
feat(run-engine): wire up TTL system callback and compute ttlExpiresAt on enqueue
- Add TTL system options to RunEngineOptions.queue for configuration - Add #ttlExpiredCallback method on RunEngine that calls ttlSystem.expireRun() - Pass TTL system options to RunQueue in RunEngine constructor - Compute ttlExpiresAt from run.ttl when enqueuing runs in EnqueueSystem - Add env vars for TTL system configuration (disabled, shard count, poll interval, batch size) - Configure TTL system in webapp's runEngine.server.ts The TTL system enables automatic expiration of runs that have been in the queue past their TTL deadline. When runs expire, the callback updates their status to EXPIRED in the database and emits appropriate events. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
1 parent 400673d commit 424a68e

File tree

5 files changed

+71
-0
lines changed

5 files changed

+71
-0
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,12 @@ const EnvironmentSchema = z
591591
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
592592
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),
593593

594+
// TTL System settings for automatic run expiration
595+
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
596+
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
597+
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
598+
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),
599+
594600
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
595601
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
596602
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ function createRunEngine() {
8080
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
8181
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
8282
},
83+
ttlSystem: {
84+
disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED,
85+
shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT,
86+
pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS,
87+
batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE,
88+
},
8389
},
8490
runLock: {
8591
redis: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ export class RunEngine {
182182
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
183183
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
184184
meter: options.meter,
185+
ttlSystem: options.queue?.ttlSystem?.disabled
186+
? undefined
187+
: {
188+
shardCount: options.queue?.ttlSystem?.shardCount,
189+
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
190+
batchSize: options.queue?.ttlSystem?.batchSize,
191+
callback: this.#ttlExpiredCallback.bind(this),
192+
},
185193
// Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists
186194
runDataProvider: {
187195
getRunData: async (runId: string) => {
@@ -2103,6 +2111,35 @@ export class RunEngine {
21032111
}));
21042112
}
21052113

2114+
/**
2115+
* Callback for the TTL system when runs expire.
2116+
* Calls ttlSystem.expireRun() for each expired run to update database and emit events.
2117+
*/
2118+
async #ttlExpiredCallback(
2119+
runs: Array<{ queueKey: string; runId: string; orgId: string }>
2120+
): Promise<void> {
2121+
// Process expired runs concurrently with limited parallelism
2122+
await pMap(
2123+
runs,
2124+
async (run) => {
2125+
try {
2126+
await this.ttlSystem.expireRun({ runId: run.runId });
2127+
this.logger.debug("TTL system expired run", {
2128+
runId: run.runId,
2129+
orgId: run.orgId,
2130+
});
2131+
} catch (error) {
2132+
this.logger.error("Failed to expire run via TTL system", {
2133+
runId: run.runId,
2134+
orgId: run.orgId,
2135+
error,
2136+
});
2137+
}
2138+
},
2139+
{ concurrency: 10 }
2140+
);
2141+
}
2142+
21062143
/**
21072144
* Invalidates the billing cache for an organization when their plan changes
21082145
* Runs in background and handles all errors internally

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
TaskRun,
55
TaskRunExecutionStatus,
66
} from "@trigger.dev/database";
7+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
78
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
89
import { ExecutionSnapshotSystem } from "./executionSnapshotSystem.js";
910
import { SystemResources } from "./systems.js";
@@ -81,6 +82,15 @@ export class EnqueueSystem {
8182

8283
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
8384

85+
// Calculate TTL expiration timestamp if the run has a TTL
86+
let ttlExpiresAt: number | undefined;
87+
if (run.ttl) {
88+
const expireAt = parseNaturalLanguageDuration(run.ttl);
89+
if (expireAt) {
90+
ttlExpiresAt = expireAt.getTime();
91+
}
92+
}
93+
8494
await this.$.runQueue.enqueueMessage({
8595
env,
8696
workerQueue,
@@ -95,6 +105,7 @@ export class EnqueueSystem {
95105
concurrencyKey: run.concurrencyKey ?? undefined,
96106
timestamp,
97107
attempt: 0,
108+
ttlExpiresAt,
98109
},
99110
});
100111

internal-packages/run-engine/src/engine/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ export type RunEngineOptions = {
6363
scanJitterInMs?: number;
6464
processMarkedJitterInMs?: number;
6565
};
66+
/** TTL system options for automatic run expiration */
67+
ttlSystem?: {
68+
/** Number of shards for TTL sorted sets (default: same as queue shards) */
69+
shardCount?: number;
70+
/** How often to poll each shard for expired runs (ms, default: 1000) */
71+
pollIntervalMs?: number;
72+
/** Max number of runs to expire per poll per shard (default: 100) */
73+
batchSize?: number;
74+
/** Whether TTL consumers are disabled (default: false) */
75+
disabled?: boolean;
76+
};
6677
};
6778
runLock: {
6879
redis: RedisOptions;

0 commit comments

Comments
 (0)