Skip to content

Publish redis-worker and add graceful shutdown manager #1810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
90b6437
add shutdown manager
nicktrn Mar 19, 2025
4395062
update ai test instructions
nicktrn Mar 19, 2025
966e42e
add shutdown timeout to redis-worker
nicktrn Mar 19, 2025
0144edc
move redis worker to packages
nicktrn Mar 19, 2025
1180166
add unregister method
nicktrn Mar 19, 2025
33d1f4e
prep for publishing package
nicktrn Mar 19, 2025
b202f9c
fix types
nicktrn Mar 19, 2025
22e9dc1
update ai files
nicktrn Mar 19, 2025
ba68340
fix cursor terminal links
nicktrn Mar 20, 2025
25c1042
prevent overly friendly ids
nicktrn Mar 20, 2025
abcab33
use structured logger
nicktrn Mar 20, 2025
ecb9cad
use unique shutdown handler names
nicktrn Mar 20, 2025
66c0c5b
rework suspend completion
nicktrn Mar 20, 2025
e979dd9
add trycatch util
nicktrn Mar 20, 2025
26472d3
rework suspend restore
nicktrn Mar 20, 2025
b3eeb48
add http server metrics
nicktrn Mar 20, 2025
4d45f52
add missing prom-client to core
nicktrn Mar 21, 2025
05214e8
add prom metrics to redis worker
nicktrn Mar 21, 2025
ba57e06
bundle redis-worker
nicktrn Mar 21, 2025
3955654
Merge remote-tracking branch 'origin/main' into feat/redis-worker-pac…
nicktrn Mar 21, 2025
c05688c
fix esm/cjs interop
nicktrn Mar 21, 2025
0198077
remove proxy from changeset ignore and add supervisor
nicktrn Mar 21, 2025
aa3e57d
add pause to prerelease script for any manual edits
nicktrn Mar 21, 2025
ff78b31
unregister the correct handler and add early detection
nicktrn Mar 21, 2025
821b918
small change to http handler return
nicktrn Mar 21, 2025
59afe71
fix worker tests
nicktrn Mar 21, 2025
921edcb
fix shutdown manager tests
nicktrn Mar 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .changeset/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"access": "public",
"baseBranch": "main",
"updateInternalDependencies": "patch",
"ignore": ["webapp", "proxy", "coordinator", "docker-provider", "kubernetes-provider"],
"ignore": ["webapp", "supervisor", "coordinator", "docker-provider", "kubernetes-provider"],
"___experimentalUnsafeOptions_WILL_CHANGE_IN_PATCH": {
"onlyUpdatePeerDependentsWhenOutOfRange": true
}
Expand Down
2 changes: 1 addition & 1 deletion .cursor/rules/webapp.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The main trigger.dev webapp, which powers it's API and dashboard and makes up th
- `@trigger.dev/database` exports a Prisma 5.4.1 client that is used extensively in the webapp to access a PostgreSQL instance. The schema file is [schema.prisma](mdc:internal-packages/database/prisma/schema.prisma)
- `@trigger.dev/core` is a published package and is used to share code between the `@trigger.dev/sdk` and the webapp. It includes functionality but also a load of Zod schemas for data validation. When importing from `@trigger.dev/core` in the webapp, we never import the root `@trigger.dev/core` path, instead we favor one of the subpath exports that you can find in [package.json](mdc:packages/core/package.json)
- `@internal/run-engine` has all the code needed to trigger a run and take it through it's lifecycle to completion.
- `@internal/redis-worker` is a custom redis based background job/worker system that's used in the webapp and also used inside the run engine.
- `@trigger.dev/redis-worker` is a custom redis based background job/worker system that's used in the webapp and also used inside the run engine.

## Environment variables and testing

Expand Down
2 changes: 1 addition & 1 deletion ai/references/repo.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ This is a pnpm 8.15.5 monorepo that uses turborepo @turbo.json. The following wo
- <root>/packages/core is the `@trigger.dev/core` package that is shared across the SDK and other packages
- <root>/packages/build defines the types and prebuilt build extensions for trigger.dev. See our [build extensions docs](https://trigger.dev/docs/config/extensions/overview.md) for more information.
- <root>/packages/react-hooks defines some useful react hooks like our realtime hooks. See our [Realtime hooks](https://trigger.dev/docs/frontend/react-hooks/realtime.md) and our [Trigger hooks](https://trigger.dev/docs/frontend/react-hooks/triggering.md) for more information.
- <root>/packages/redis-worker is the `@trigger.dev/redis-worker` package that implements a custom background job/worker sytem powered by redis for offloading work to the background, used in the webapp and also in the Run Engine 2.0.

## Internal Packages

- <root>/internal-packages/\* are packages that are used internally only, not published, and usually they have a tsc build step and are used in the webapp
- <root>/internal-packages/database is the `@trigger.dev/database` package that exports a prisma client, has the schema file, and exports a few other helpers.
- <root>/internal-packages/run-engine is the `@internal/run-engine` package that is "Run Engine 2.0" and handles moving a run all the way through it's lifecycle
- <root>/internal-packages/redis-worker is the `@internal/redis-worker` package that implements a custom background job/worker sytem powered by redis for offloading work to the background, used in the webapp and also in the Run Engine 2.0.
- <root>/internal-packages/redis is the `@internal/redis` package that exports Redis types and the `createRedisClient` function to unify how we create redis clients in the repo. It's not used everywhere yet, but it's the preferred way to create redis clients from now on.
- <root>/internal-packages/testcontainers is the `@internal/testcontainers` package that exports a few useful functions for spinning up local testcontainers when writing vitest tests. See our [tests.md](./tests.md) file for more information.
- <root>/internal-packages/zodworker is the `@internal/zodworker` package that implements a wrapper around graphile-worker that allows us to use zod to validate our background jobs. We are moving away from using graphile-worker as our background job system, replacing it with our own redis-worker package.
Expand Down
5 changes: 5 additions & 0 deletions ai/references/tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ pnpm run test ./src/components/Button.test.ts

We use vitest for testing. We almost NEVER mock anything. Start with a top-level "describe", and have multiple "it" statements inside of it.

New test files should be placed right next to the file being tested. For example:

- Source file: `./src/services/MyService.ts`
- Test file: `./src/services/MyService.test.ts`

When writing anything that needs redis or postgresql, we have some internal "testcontainers" that are used to spin up a local instance, redis, or both.

redisTest:
Expand Down
2 changes: 1 addition & 1 deletion apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "module",
"scripts": {
"build": "tsc",
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts",
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
"start": "node --experimental-sqlite dist/index.js",
"typecheck": "tsc --noEmit"
},
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const Env = z.object({
// Used by the resource monitor
OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),

// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
KUBERNETES_NAMESPACE: z.string().default("default"),
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),
});

export const env = Env.parse(stdEnv);
18 changes: 14 additions & 4 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import {
} from "./resourceMonitor.js";
import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js";
import { DockerWorkloadManager } from "./workloadManager/docker.js";
import { HttpServer, CheckpointClient } from "@trigger.dev/core/v3/serverOnly";
import {
HttpServer,
CheckpointClient,
isKubernetesEnvironment,
} from "@trigger.dev/core/v3/serverOnly";
import { createK8sApi, RUNTIME_ENV } from "./clients/kubernetes.js";

class ManagedSupervisor {
Expand All @@ -25,7 +29,7 @@ class ManagedSupervisor {
private readonly resourceMonitor: ResourceMonitor;
private readonly checkpointClient?: CheckpointClient;

private readonly isKubernetes = RUNTIME_ENV === "kubernetes";
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;

constructor() {
Expand Down Expand Up @@ -94,6 +98,7 @@ class ManagedSupervisor {
this.checkpointClient = new CheckpointClient({
apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL),
workerClient: this.workerSession.httpClient,
orchestrator: this.isKubernetes ? "KUBERNETES" : "DOCKER",
});
}

Expand Down Expand Up @@ -127,7 +132,9 @@ class ManagedSupervisor {
return;
}

if (message.checkpoint) {
const { checkpoint, ...rest } = message;

if (checkpoint) {
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });

if (!this.checkpointClient) {
Expand All @@ -139,7 +146,10 @@ class ManagedSupervisor {
const didRestore = await this.checkpointClient.restoreRun({
runFriendlyId: message.run.friendlyId,
snapshotFriendlyId: message.snapshot.friendlyId,
checkpoint: message.checkpoint,
body: {
...rest,
checkpoint,
},
});

if (didRestore) {
Expand Down
9 changes: 3 additions & 6 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";

const POD_EPHEMERAL_STORAGE_SIZE_LIMIT = process.env.POD_EPHEMERAL_STORAGE_SIZE_LIMIT || "10Gi";
const POD_EPHEMERAL_STORAGE_SIZE_REQUEST = process.env.POD_EPHEMERAL_STORAGE_SIZE_REQUEST || "2Gi";

type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
};

export class KubernetesWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
private k8s: K8sApi;
private namespace = "default";
private namespace = env.KUBERNETES_NAMESPACE;

constructor(private opts: WorkloadManagerOptions) {
this.k8s = createK8sApi();
Expand Down Expand Up @@ -205,13 +202,13 @@ export class KubernetesWorkloadManager implements WorkloadManager {

get #defaultResourceRequests(): ResourceQuantities {
return {
"ephemeral-storage": POD_EPHEMERAL_STORAGE_SIZE_REQUEST,
"ephemeral-storage": env.EPHEMERAL_STORAGE_SIZE_REQUEST,
};
}

get #defaultResourceLimits(): ResourceQuantities {
return {
"ephemeral-storage": POD_EPHEMERAL_STORAGE_SIZE_LIMIT,
"ephemeral-storage": env.EPHEMERAL_STORAGE_SIZE_LIMIT,
};
}

Expand Down
35 changes: 28 additions & 7 deletions apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
this.websocketServer = this.createWebsocketServer();
}

private runnerIdFromRequest(req: IncomingMessage): string | undefined {
const value = req.headers[WORKLOAD_HEADERS.RUNNER_ID];
private headerValueFromRequest(req: IncomingMessage, headerName: string): string | undefined {
const value = req.headers[headerName];

if (Array.isArray(value)) {
return value[0];
Expand All @@ -104,6 +104,22 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
return value;
}

private runnerIdFromRequest(req: IncomingMessage): string | undefined {
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.RUNNER_ID);
}

private deploymentIdFromRequest(req: IncomingMessage): string | undefined {
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.DEPLOYMENT_ID);
}

private deploymentVersionFromRequest(req: IncomingMessage): string | undefined {
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.DEPLOYMENT_VERSION);
}

private projectRefFromRequest(req: IncomingMessage): string | undefined {
return this.headerValueFromRequest(req, WORKLOAD_HEADERS.PROJECT_REF);
}

private createHttpServer({ host, port }: { host: string; port: number }) {
return new HttpServer({ port, host })
.route(
Expand Down Expand Up @@ -213,8 +229,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
}

const runnerId = this.runnerIdFromRequest(req);
const deploymentVersion = this.deploymentVersionFromRequest(req);
const projectRef = this.projectRefFromRequest(req);

if (!runnerId) {
if (!runnerId || !deploymentVersion || !projectRef) {
console.error("Invalid headers for suspend request", {
...params,
headers: req.headers,
Expand All @@ -241,16 +259,19 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
const suspendResult = await this.checkpointClient.suspendRun({
runFriendlyId: params.runFriendlyId,
snapshotFriendlyId: params.snapshotFriendlyId,
containerId: runnerId,
runnerId,
body: {
runnerId,
runId: params.runFriendlyId,
snapshotId: params.snapshotFriendlyId,
projectRef,
deploymentVersion,
},
});

if (!suspendResult) {
console.error("Failed to suspend run", { params });
return;
}

console.log("Suspended run", { params });
},
}
)
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -591,6 +592,7 @@ const EnvironmentSchema = z.object({
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -633,6 +635,7 @@ const EnvironmentSchema = z.object({
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

COMMON_WORKER_REDIS_HOST: z
.string()
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
Expand Down Expand Up @@ -80,6 +80,7 @@ function initializeWorker() {
},
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("CommonWorker", "debug"),
jobs: {
"v3.deliverAlert": async ({ payload }) => {
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
Expand Down Expand Up @@ -67,6 +67,7 @@ function initializeWorker() {
},
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("LegacyRunEngineWorker", "debug"),
jobs: {
runHeartbeat: async ({ payload }) => {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function createRunEngine() {
workers: env.RUN_ENGINE_WORKER_COUNT,
tasksPerWorker: env.RUN_ENGINE_TASKS_PER_WORKER,
pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL,
shutdownTimeoutMs: env.RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
redis: {
keyPrefix: "engine:",
port: env.RUN_ENGINE_WORKER_REDIS_PORT ?? undefined,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"@internal/run-engine": "workspace:*",
"@internal/zod-worker": "workspace:*",
"@internal/redis": "workspace:*",
"@internal/redis-worker": "workspace:*",
"@trigger.dev/redis-worker": "workspace:*",
"@internationalized/date": "^3.5.1",
"@lezer/highlight": "^1.1.6",
"@opentelemetry/api": "1.9.0",
Expand Down
38 changes: 0 additions & 38 deletions internal-packages/redis-worker/package.json

This file was deleted.

8 changes: 0 additions & 8 deletions internal-packages/redis-worker/tsconfig.json

This file was deleted.

19 changes: 0 additions & 19 deletions internal-packages/redis-worker/tsconfig.src.json

This file was deleted.

20 changes: 0 additions & 20 deletions internal-packages/redis-worker/tsconfig.test.json

This file was deleted.

2 changes: 1 addition & 1 deletion internal-packages/redis/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Redis, RedisOptions } from "ioredis";
import { Redis, type RedisOptions } from "ioredis";
import { Logger } from "@trigger.dev/core/logger";

export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis";
Expand Down
Loading
Loading