Skip to content

Commit 49a3f72

Browse files
authored
Publish redis-worker and add graceful shutdown manager (#1810)
* add shutdown manager * update ai test instructions * add shutdown timeout to redis-worker * move redis worker to packages * add unregister method * prep for publishing package * fix types * update ai files * fix cursor terminal links * prevent overly friendly ids * use structured logger * use unique shutdown handler names * rework suspend completion * add trycatch util * rework suspend restore * add http server metrics * add missing prom-client to core * add prom metrics to redis worker * bundle redis-worker * fix esm/cjs interop * remove proxy from changeset ignore and add supervisor * add pause to prerelease script for any manual edits * unregister the correct handler and add early detection * small change to http handler return * fix worker tests * fix shutdown manager tests
1 parent adca199 commit 49a3f72

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1637
-355
lines changed

.changeset/config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"access": "public",
1313
"baseBranch": "main",
1414
"updateInternalDependencies": "patch",
15-
"ignore": ["webapp", "proxy", "coordinator", "docker-provider", "kubernetes-provider"],
15+
"ignore": ["webapp", "supervisor", "coordinator", "docker-provider", "kubernetes-provider"],
1616
"___experimentalUnsafeOptions_WILL_CHANGE_IN_PATCH": {
1717
"onlyUpdatePeerDependentsWhenOutOfRange": true
1818
}

.cursor/rules/webapp.mdc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ The main trigger.dev webapp, which powers it's API and dashboard and makes up th
99
- `@trigger.dev/database` exports a Prisma 5.4.1 client that is used extensively in the webapp to access a PostgreSQL instance. The schema file is [schema.prisma](mdc:internal-packages/database/prisma/schema.prisma)
1010
- `@trigger.dev/core` is a published package and is used to share code between the `@trigger.dev/sdk` and the webapp. It includes functionality but also a load of Zod schemas for data validation. When importing from `@trigger.dev/core` in the webapp, we never import the root `@trigger.dev/core` path, instead we favor one of the subpath exports that you can find in [package.json](mdc:packages/core/package.json)
1111
- `@internal/run-engine` has all the code needed to trigger a run and take it through it's lifecycle to completion.
12-
- `@internal/redis-worker` is a custom redis based background job/worker system that's used in the webapp and also used inside the run engine.
12+
- `@trigger.dev/redis-worker` is a custom redis based background job/worker system that's used in the webapp and also used inside the run engine.
1313

1414
## Environment variables and testing
1515

ai/references/repo.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ This is a pnpm 8.15.5 monorepo that uses turborepo @turbo.json. The following wo
1414
- <root>/packages/core is the `@trigger.dev/core` package that is shared across the SDK and other packages
1515
- <root>/packages/build defines the types and prebuilt build extensions for trigger.dev. See our [build extensions docs](https://trigger.dev/docs/config/extensions/overview.md) for more information.
1616
- <root>/packages/react-hooks defines some useful react hooks like our realtime hooks. See our [Realtime hooks](https://trigger.dev/docs/frontend/react-hooks/realtime.md) and our [Trigger hooks](https://trigger.dev/docs/frontend/react-hooks/triggering.md) for more information.
17+
- <root>/packages/redis-worker is the `@trigger.dev/redis-worker` package that implements a custom background job/worker sytem powered by redis for offloading work to the background, used in the webapp and also in the Run Engine 2.0.
1718

1819
## Internal Packages
1920

2021
- <root>/internal-packages/\* are packages that are used internally only, not published, and usually they have a tsc build step and are used in the webapp
2122
- <root>/internal-packages/database is the `@trigger.dev/database` package that exports a prisma client, has the schema file, and exports a few other helpers.
2223
- <root>/internal-packages/run-engine is the `@internal/run-engine` package that is "Run Engine 2.0" and handles moving a run all the way through it's lifecycle
23-
- <root>/internal-packages/redis-worker is the `@internal/redis-worker` package that implements a custom background job/worker sytem powered by redis for offloading work to the background, used in the webapp and also in the Run Engine 2.0.
2424
- <root>/internal-packages/redis is the `@internal/redis` package that exports Redis types and the `createRedisClient` function to unify how we create redis clients in the repo. It's not used everywhere yet, but it's the preferred way to create redis clients from now on.
2525
- <root>/internal-packages/testcontainers is the `@internal/testcontainers` package that exports a few useful functions for spinning up local testcontainers when writing vitest tests. See our [tests.md](./tests.md) file for more information.
2626
- <root>/internal-packages/zodworker is the `@internal/zodworker` package that implements a wrapper around graphile-worker that allows us to use zod to validate our background jobs. We are moving away from using graphile-worker as our background job system, replacing it with our own redis-worker package.

ai/references/tests.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ pnpm run test ./src/components/Button.test.ts
2525

2626
We use vitest for testing. We almost NEVER mock anything. Start with a top-level "describe", and have multiple "it" statements inside of it.
2727

28+
New test files should be placed right next to the file being tested. For example:
29+
30+
- Source file: `./src/services/MyService.ts`
31+
- Test file: `./src/services/MyService.test.ts`
32+
2833
When writing anything that needs redis or postgresql, we have some internal "testcontainers" that are used to spin up a local instance, redis, or both.
2934

3035
redisTest:

apps/supervisor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"type": "module",
77
"scripts": {
88
"build": "tsc",
9-
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts",
9+
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
1010
"start": "node --experimental-sqlite dist/index.js",
1111
"typecheck": "tsc --noEmit"
1212
},

apps/supervisor/src/env.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ const Env = z.object({
4545
// Used by the resource monitor
4646
OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
4747
OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),
48+
49+
// Kubernetes specific settings
50+
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
51+
KUBERNETES_NAMESPACE: z.string().default("default"),
52+
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
53+
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),
4854
});
4955

5056
export const env = Env.parse(stdEnv);

apps/supervisor/src/index.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ import {
1313
} from "./resourceMonitor.js";
1414
import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js";
1515
import { DockerWorkloadManager } from "./workloadManager/docker.js";
16-
import { HttpServer, CheckpointClient } from "@trigger.dev/core/v3/serverOnly";
16+
import {
17+
HttpServer,
18+
CheckpointClient,
19+
isKubernetesEnvironment,
20+
} from "@trigger.dev/core/v3/serverOnly";
1721
import { createK8sApi, RUNTIME_ENV } from "./clients/kubernetes.js";
1822

1923
class ManagedSupervisor {
@@ -25,7 +29,7 @@ class ManagedSupervisor {
2529
private readonly resourceMonitor: ResourceMonitor;
2630
private readonly checkpointClient?: CheckpointClient;
2731

28-
private readonly isKubernetes = RUNTIME_ENV === "kubernetes";
32+
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
2933
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;
3034

3135
constructor() {
@@ -94,6 +98,7 @@ class ManagedSupervisor {
9498
this.checkpointClient = new CheckpointClient({
9599
apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL),
96100
workerClient: this.workerSession.httpClient,
101+
orchestrator: this.isKubernetes ? "KUBERNETES" : "DOCKER",
97102
});
98103
}
99104

@@ -127,7 +132,9 @@ class ManagedSupervisor {
127132
return;
128133
}
129134

130-
if (message.checkpoint) {
135+
const { checkpoint, ...rest } = message;
136+
137+
if (checkpoint) {
131138
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });
132139

133140
if (!this.checkpointClient) {
@@ -139,7 +146,10 @@ class ManagedSupervisor {
139146
const didRestore = await this.checkpointClient.restoreRun({
140147
runFriendlyId: message.run.friendlyId,
141148
snapshotFriendlyId: message.snapshot.friendlyId,
142-
checkpoint: message.checkpoint,
149+
body: {
150+
...rest,
151+
checkpoint,
152+
},
143153
});
144154

145155
if (didRestore) {

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@ import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
99
import { env } from "../env.js";
1010
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
1111

12-
const POD_EPHEMERAL_STORAGE_SIZE_LIMIT = process.env.POD_EPHEMERAL_STORAGE_SIZE_LIMIT || "10Gi";
13-
const POD_EPHEMERAL_STORAGE_SIZE_REQUEST = process.env.POD_EPHEMERAL_STORAGE_SIZE_REQUEST || "2Gi";
14-
1512
type ResourceQuantities = {
1613
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
1714
};
1815

1916
export class KubernetesWorkloadManager implements WorkloadManager {
2017
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
2118
private k8s: K8sApi;
22-
private namespace = "default";
19+
private namespace = env.KUBERNETES_NAMESPACE;
2320

2421
constructor(private opts: WorkloadManagerOptions) {
2522
this.k8s = createK8sApi();
@@ -205,13 +202,13 @@ export class KubernetesWorkloadManager implements WorkloadManager {
205202

206203
get #defaultResourceRequests(): ResourceQuantities {
207204
return {
208-
"ephemeral-storage": POD_EPHEMERAL_STORAGE_SIZE_REQUEST,
205+
"ephemeral-storage": env.EPHEMERAL_STORAGE_SIZE_REQUEST,
209206
};
210207
}
211208

212209
get #defaultResourceLimits(): ResourceQuantities {
213210
return {
214-
"ephemeral-storage": POD_EPHEMERAL_STORAGE_SIZE_LIMIT,
211+
"ephemeral-storage": env.EPHEMERAL_STORAGE_SIZE_LIMIT,
215212
};
216213
}
217214

apps/supervisor/src/workloadServer/index.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
9494
this.websocketServer = this.createWebsocketServer();
9595
}
9696

97-
private runnerIdFromRequest(req: IncomingMessage): string | undefined {
98-
const value = req.headers[WORKLOAD_HEADERS.RUNNER_ID];
97+
private headerValueFromRequest(req: IncomingMessage, headerName: string): string | undefined {
98+
const value = req.headers[headerName];
9999

100100
if (Array.isArray(value)) {
101101
return value[0];
@@ -104,6 +104,22 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
104104
return value;
105105
}
106106

107+
private runnerIdFromRequest(req: IncomingMessage): string | undefined {
108+
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.RUNNER_ID);
109+
}
110+
111+
private deploymentIdFromRequest(req: IncomingMessage): string | undefined {
112+
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.DEPLOYMENT_ID);
113+
}
114+
115+
private deploymentVersionFromRequest(req: IncomingMessage): string | undefined {
116+
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.DEPLOYMENT_VERSION);
117+
}
118+
119+
private projectRefFromRequest(req: IncomingMessage): string | undefined {
120+
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.PROJECT_REF);
121+
}
122+
107123
private createHttpServer({ host, port }: { host: string; port: number }) {
108124
return new HttpServer({ port, host })
109125
.route(
@@ -213,8 +229,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
213229
}
214230

215231
const runnerId = this.runnerIdFromRequest(req);
232+
const deploymentVersion = this.deploymentVersionFromRequest(req);
233+
const projectRef = this.projectRefFromRequest(req);
216234

217-
if (!runnerId) {
235+
if (!runnerId || !deploymentVersion || !projectRef) {
218236
console.error("Invalid headers for suspend request", {
219237
...params,
220238
headers: req.headers,
@@ -241,16 +259,19 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
241259
const suspendResult = await this.checkpointClient.suspendRun({
242260
runFriendlyId: params.runFriendlyId,
243261
snapshotFriendlyId: params.snapshotFriendlyId,
244-
containerId: runnerId,
245-
runnerId,
262+
body: {
263+
runnerId,
264+
runId: params.runFriendlyId,
265+
snapshotId: params.snapshotFriendlyId,
266+
projectRef,
267+
deploymentVersion,
268+
},
246269
});
247270

248271
if (!suspendResult) {
249272
console.error("Failed to suspend run", { params });
250273
return;
251274
}
252-
253-
console.log("Suspended run", { params });
254275
},
255276
}
256277
)

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ const EnvironmentSchema = z.object({
424424
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
425425
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
426426
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
427+
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
427428

428429
RUN_ENGINE_WORKER_REDIS_HOST: z
429430
.string()
@@ -591,6 +592,7 @@ const EnvironmentSchema = z.object({
591592
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
592593
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
593594
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
595+
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
594596

595597
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
596598
.string()
@@ -633,6 +635,7 @@ const EnvironmentSchema = z.object({
633635
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
634636
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
635637
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
638+
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
636639

637640
COMMON_WORKER_REDIS_HOST: z
638641
.string()

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Worker as RedisWorker } from "@internal/redis-worker";
1+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
22
import { Logger } from "@trigger.dev/core/logger";
33
import { z } from "zod";
44
import { env } from "~/env.server";
@@ -80,6 +80,7 @@ function initializeWorker() {
8080
},
8181
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
8282
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
83+
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
8384
logger: new Logger("CommonWorker", "debug"),
8485
jobs: {
8586
"v3.deliverAlert": async ({ payload }) => {

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Worker as RedisWorker } from "@internal/redis-worker";
1+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
22
import { Logger } from "@trigger.dev/core/logger";
33
import { z } from "zod";
44
import { env } from "~/env.server";
@@ -67,6 +67,7 @@ function initializeWorker() {
6767
},
6868
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
6969
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
70+
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
7071
logger: new Logger("LegacyRunEngineWorker", "debug"),
7172
jobs: {
7273
runHeartbeat: async ({ payload }) => {

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ function createRunEngine() {
1717
workers: env.RUN_ENGINE_WORKER_COUNT,
1818
tasksPerWorker: env.RUN_ENGINE_TASKS_PER_WORKER,
1919
pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL,
20+
shutdownTimeoutMs: env.RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
2021
redis: {
2122
keyPrefix: "engine:",
2223
port: env.RUN_ENGINE_WORKER_REDIS_PORT ?? undefined,

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"@internal/run-engine": "workspace:*",
5454
"@internal/zod-worker": "workspace:*",
5555
"@internal/redis": "workspace:*",
56-
"@internal/redis-worker": "workspace:*",
56+
"@trigger.dev/redis-worker": "workspace:*",
5757
"@internationalized/date": "^3.5.1",
5858
"@lezer/highlight": "^1.1.6",
5959
"@opentelemetry/api": "1.9.0",

internal-packages/redis-worker/package.json

Lines changed: 0 additions & 38 deletions
This file was deleted.

internal-packages/redis-worker/tsconfig.json

Lines changed: 0 additions & 8 deletions
This file was deleted.

internal-packages/redis-worker/tsconfig.src.json

Lines changed: 0 additions & 19 deletions
This file was deleted.

internal-packages/redis-worker/tsconfig.test.json

Lines changed: 0 additions & 20 deletions
This file was deleted.

internal-packages/redis/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Redis, RedisOptions } from "ioredis";
1+
import { Redis, type RedisOptions } from "ioredis";
22
import { Logger } from "@trigger.dev/core/logger";
33

44
export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis";

0 commit comments

Comments
 (0)