Skip to content

Commit bed834b

Browse files
authored
Misc v4 checkpoint fixes (#1859)
* logs for optional services * print env vars on startup in debug mode * routes need to explicitly ask to keep connection alive * log indicators for now * make workload api listen host configurable * expose supervisor metrics and make more configurable * configurable pull secrets, no defaults * remove restore route * run controller to handle queued executing * fix v3 deploys in v4 project * update admin worker route * only start pod cleaner et al in k8s mode * set new worker group as default if none yet * make image ref optional * checkpoint image ref is optional for output as well * export feature flag const * one last image ref type fix * make runner intervals configurable * ability to set arbitrary env vars on new runners * set default runtime back to node 21 * move all runner env vars to the same section
1 parent 9daa397 commit bed834b

File tree

13 files changed

+197
-35
lines changed

13 files changed

+197
-35
lines changed

apps/supervisor/src/env.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
import { randomUUID } from "crypto";
22
import { env as stdEnv } from "std-env";
33
import { z } from "zod";
4-
5-
const BoolEnv = z.preprocess((val) => {
6-
if (typeof val !== "string") {
7-
return val;
8-
}
9-
10-
return ["true", "1"].includes(val.toLowerCase().trim());
11-
}, z.boolean());
4+
import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";
125

136
const Env = z.object({
147
// This will come from `spec.nodeName` in k8s
@@ -30,6 +23,11 @@ const Env = z.object({
3023
TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on
3124
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller
3225

26+
// Runner settings
27+
RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(),
28+
RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(),
29+
RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv)
30+
3331
// Dequeue settings (provider mode)
3432
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
3533
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),

apps/supervisor/src/envUtil.test.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { describe, it, expect } from "vitest";
2+
import { BoolEnv, AdditionalEnvVars } from "./envUtil.js";
3+
4+
describe("BoolEnv", () => {
5+
it("should parse string 'true' as true", () => {
6+
expect(BoolEnv.parse("true")).toBe(true);
7+
expect(BoolEnv.parse("TRUE")).toBe(true);
8+
expect(BoolEnv.parse("True")).toBe(true);
9+
});
10+
11+
it("should parse string '1' as true", () => {
12+
expect(BoolEnv.parse("1")).toBe(true);
13+
});
14+
15+
it("should parse string 'false' as false", () => {
16+
expect(BoolEnv.parse("false")).toBe(false);
17+
expect(BoolEnv.parse("FALSE")).toBe(false);
18+
expect(BoolEnv.parse("False")).toBe(false);
19+
});
20+
21+
it("should handle whitespace", () => {
22+
expect(BoolEnv.parse(" true ")).toBe(true);
23+
expect(BoolEnv.parse(" 1 ")).toBe(true);
24+
});
25+
26+
it("should pass through boolean values", () => {
27+
expect(BoolEnv.parse(true)).toBe(true);
28+
expect(BoolEnv.parse(false)).toBe(false);
29+
});
30+
31+
it("should return false for invalid inputs", () => {
32+
expect(BoolEnv.parse("invalid")).toBe(false);
33+
expect(BoolEnv.parse("")).toBe(false);
34+
});
35+
});
36+
37+
describe("AdditionalEnvVars", () => {
38+
it("should parse single key-value pair", () => {
39+
expect(AdditionalEnvVars.parse("FOO=bar")).toEqual({ FOO: "bar" });
40+
});
41+
42+
it("should parse multiple key-value pairs", () => {
43+
expect(AdditionalEnvVars.parse("FOO=bar,BAZ=qux")).toEqual({
44+
FOO: "bar",
45+
BAZ: "qux",
46+
});
47+
});
48+
49+
it("should handle whitespace", () => {
50+
expect(AdditionalEnvVars.parse(" FOO = bar , BAZ = qux ")).toEqual({
51+
FOO: "bar",
52+
BAZ: "qux",
53+
});
54+
});
55+
56+
it("should return undefined for empty string", () => {
57+
expect(AdditionalEnvVars.parse("")).toBeUndefined();
58+
});
59+
60+
it("should return undefined for invalid format", () => {
61+
expect(AdditionalEnvVars.parse("invalid")).toBeUndefined();
62+
});
63+
64+
it("should skip invalid pairs but include valid ones", () => {
65+
expect(AdditionalEnvVars.parse("FOO=bar,INVALID,BAZ=qux")).toEqual({
66+
FOO: "bar",
67+
BAZ: "qux",
68+
});
69+
});
70+
71+
it("should pass through undefined", () => {
72+
expect(AdditionalEnvVars.parse(undefined)).toBeUndefined();
73+
});
74+
75+
it("should handle empty values", () => {
76+
expect(AdditionalEnvVars.parse("FOO=,BAR=value")).toEqual({
77+
BAR: "value",
78+
});
79+
});
80+
});

apps/supervisor/src/envUtil.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { z } from "zod";
2+
3+
export const BoolEnv = z.preprocess((val) => {
4+
if (typeof val !== "string") {
5+
return val;
6+
}
7+
8+
return ["true", "1"].includes(val.toLowerCase().trim());
9+
}, z.boolean());
10+
11+
export const AdditionalEnvVars = z.preprocess((val) => {
12+
if (typeof val !== "string") {
13+
return val;
14+
}
15+
16+
if (!val) {
17+
return undefined;
18+
}
19+
20+
try {
21+
const result = val.split(",").reduce(
22+
(acc, pair) => {
23+
const [key, value] = pair.split("=");
24+
if (!key || !value) {
25+
return acc;
26+
}
27+
acc[key.trim()] = value.trim();
28+
return acc;
29+
},
30+
{} as Record<string, string>
31+
);
32+
33+
// Return undefined if no valid key-value pairs were found
34+
return Object.keys(result).length === 0 ? undefined : result;
35+
} catch (error) {
36+
console.warn("Failed to parse additional env vars", { error, val });
37+
return undefined;
38+
}
39+
}, z.record(z.string(), z.string()).optional());

apps/supervisor/src/index.ts

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { SupervisorSession } from "@trigger.dev/core/v3/workers";
22
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
33
import { env } from "./env.js";
44
import { WorkloadServer } from "./workloadServer/index.js";
5-
import { type WorkloadManager } from "./workloadManager/types.js";
5+
import type { WorkloadManagerOptions, WorkloadManager } from "./workloadManager/types.js";
66
import Docker from "dockerode";
77
import { z } from "zod";
88
import { type DequeuedMessage } from "@trigger.dev/core/v3";
@@ -50,16 +50,23 @@ class ManagedSupervisor {
5050
console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets });
5151
}
5252

53-
const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
54-
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
55-
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;
56-
5753
if (this.warmStartUrl) {
5854
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
5955
warmStartUrl: this.warmStartUrl,
6056
});
6157
}
6258

59+
const workloadManagerOptions = {
60+
workloadApiProtocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
61+
workloadApiDomain: env.TRIGGER_WORKLOAD_API_DOMAIN,
62+
workloadApiPort: env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL,
63+
warmStartUrl: this.warmStartUrl,
64+
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
65+
heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS,
66+
snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS,
67+
additionalEnvVars: env.RUNNER_ADDITIONAL_ENV_VARS,
68+
} satisfies WorkloadManagerOptions;
69+
6370
if (this.isKubernetes) {
6471
if (env.POD_CLEANER_ENABLED) {
6572
this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", {
@@ -92,21 +99,10 @@ class ManagedSupervisor {
9299
}
93100

94101
this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
95-
this.workloadManager = new KubernetesWorkloadManager({
96-
workloadApiProtocol,
97-
workloadApiDomain,
98-
workloadApiPort: workloadApiPortExternal,
99-
warmStartUrl: this.warmStartUrl,
100-
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
101-
});
102+
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
102103
} else {
103104
this.resourceMonitor = new DockerResourceMonitor(new Docker());
104-
this.workloadManager = new DockerWorkloadManager({
105-
workloadApiProtocol,
106-
workloadApiDomain,
107-
workloadApiPort: workloadApiPortExternal,
108-
warmStartUrl: this.warmStartUrl,
109-
});
105+
this.workloadManager = new DockerWorkloadManager(workloadManagerOptions);
110106
}
111107

112108
this.workerSession = new SupervisorSession({

apps/supervisor/src/workloadManager/docker.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,24 @@ export class DockerWorkloadManager implements WorkloadManager {
4545
runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
4646
}
4747

48+
if (this.opts.heartbeatIntervalSeconds) {
49+
runArgs.push(
50+
`--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`
51+
);
52+
}
53+
54+
if (this.opts.snapshotPollIntervalSeconds) {
55+
runArgs.push(
56+
`--env=TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}`
57+
);
58+
}
59+
60+
if (this.opts.additionalEnvVars) {
61+
Object.entries(this.opts.additionalEnvVars).forEach(([key, value]) => {
62+
runArgs.push(`--env=${key}=${value}`);
63+
});
64+
}
65+
4866
if (env.ENFORCE_MACHINE_PRESETS) {
4967
runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`);
5068
runArgs.push(`--env=TRIGGER_MACHINE_CPU=${opts.machine.cpu}`);

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,28 @@ export class KubernetesWorkloadManager implements WorkloadManager {
134134
...(this.opts.warmStartUrl
135135
? [{ name: "TRIGGER_WARM_START_URL", value: this.opts.warmStartUrl }]
136136
: []),
137+
...(this.opts.heartbeatIntervalSeconds
138+
? [
139+
{
140+
name: "TRIGGER_HEARTBEAT_INTERVAL_SECONDS",
141+
value: `${this.opts.heartbeatIntervalSeconds}`,
142+
},
143+
]
144+
: []),
145+
...(this.opts.snapshotPollIntervalSeconds
146+
? [
147+
{
148+
name: "TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS",
149+
value: `${this.opts.snapshotPollIntervalSeconds}`,
150+
},
151+
]
152+
: []),
153+
...(this.opts.additionalEnvVars
154+
? Object.entries(this.opts.additionalEnvVars).map(([key, value]) => ({
155+
name: key,
156+
value: value,
157+
}))
158+
: []),
137159
],
138160
},
139161
],

apps/supervisor/src/workloadManager/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ export interface WorkloadManagerOptions {
66
workloadApiPort: number;
77
warmStartUrl?: string;
88
imagePullSecrets?: string[];
9+
heartbeatIntervalSeconds?: number;
10+
snapshotPollIntervalSeconds?: number;
11+
additionalEnvVars?: Record<string, string>;
912
}
1013

1114
export interface WorkloadManager {

apps/webapp/app/v3/featureFlags.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import { z } from "zod";
22
import { prisma, PrismaClientOrTransaction } from "~/db.server";
33

4+
export const FEATURE_FLAG = {
5+
defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId",
6+
} as const;
7+
48
const FeatureFlagCatalog = {
5-
defaultWorkerInstanceGroupId: z.string(),
9+
[FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(),
610
};
711

812
type FeatureFlagKey = keyof typeof FeatureFlagCatalog;

apps/webapp/app/v3/services/worker/workerGroupService.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { WorkerInstanceGroup, WorkerInstanceGroupType } from "@trigger.dev/datab
22
import { WithRunEngine } from "../baseService.server";
33
import { WorkerGroupTokenService } from "./workerGroupTokenService.server";
44
import { logger } from "~/services/logger.server";
5-
import { makeFlags, makeSetFlags } from "~/v3/featureFlags.server";
5+
import { FEATURE_FLAG, makeFlags, makeSetFlags } from "~/v3/featureFlags.server";
66

77
export class WorkerGroupService extends WithRunEngine {
88
private readonly defaultNamePrefix = "worker_group";
@@ -49,14 +49,14 @@ export class WorkerGroupService extends WithRunEngine {
4949

5050
const getFlag = makeFlags(this._prisma);
5151
const defaultWorkerInstanceGroupId = await getFlag({
52-
key: "defaultWorkerInstanceGroupId",
52+
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
5353
});
5454

5555
// If there's no global default yet we should set it to the new worker group
5656
if (!defaultWorkerInstanceGroupId) {
5757
const setFlag = makeSetFlags(this._prisma);
5858
await setFlag({
59-
key: "defaultWorkerInstanceGroupId",
59+
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
6060
value: workerGroup.id,
6161
});
6262
}
@@ -169,7 +169,7 @@ export class WorkerGroupService extends WithRunEngine {
169169
const flags = makeFlags(this._prisma);
170170

171171
const defaultWorkerInstanceGroupId = await flags({
172-
key: "defaultWorkerInstanceGroupId",
172+
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
173173
});
174174

175175
if (!defaultWorkerInstanceGroupId) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRunCheckpoint" ALTER COLUMN "imageRef" DROP NOT NULL;

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2064,7 +2064,7 @@ model TaskRunCheckpoint {
20642064
20652065
type TaskRunCheckpointType
20662066
location String
2067-
imageRef String
2067+
imageRef String?
20682068
reason String?
20692069
metadata String?
20702070

packages/cli-v3/src/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ async function resolveConfig(
173173
["run_engine_v2" as const].concat(config.compatibilityFlags ?? [])
174174
);
175175

176-
const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node-22" : DEFAULT_RUNTIME;
176+
const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node" : DEFAULT_RUNTIME;
177177

178178
const mergedConfig = defu(
179179
{

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ export type CheckpointType = z.infer<typeof CheckpointType>;
161161
export const CheckpointInput = z.object({
162162
type: CheckpointType,
163163
location: z.string(),
164-
imageRef: z.string(),
164+
imageRef: z.string().nullish(),
165165
reason: z.string().nullish(),
166166
});
167167

@@ -217,7 +217,7 @@ export const DequeueMessageCheckpoint = z.object({
217217
id: z.string(),
218218
type: CheckpointType,
219219
location: z.string(),
220-
imageRef: z.string(),
220+
imageRef: z.string().nullish(),
221221
reason: z.string().nullish(),
222222
});
223223
export type DequeueMessageCheckpoint = z.infer<typeof DequeueMessageCheckpoint>;

0 commit comments

Comments
 (0)