Skip to content

v4: Experimental process keep alive #2183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e2049fc
Initial dev process keep alive. moved resource attributes to span att…
ericallam Jun 17, 2025
fd3db5f
Reset execution environment between runs
ericallam Jun 17, 2025
9374aea
Split resource and span attributes
ericallam Jun 17, 2025
427c450
Fixed metrics, reusing a process counts as a warm start
ericallam Jun 17, 2025
19bdf3b
scope the pool by background version, allow configuring pool params i…
ericallam Jun 17, 2025
1a1e70a
WIP deployed processKeepAlive
ericallam Jun 17, 2025
8798352
Super-warm starts in deployed tasks proof
ericallam Jun 17, 2025
c24f3a5
Reset deployed run worker and also get the warm start stuff to work p…
ericallam Jun 18, 2025
38fe854
Make refreshing a task run process work correctly
ericallam Jun 18, 2025
a414e65
handle env vars that have been removed between executions
ericallam Jun 18, 2025
3fb68b7
couple of tweaks for testing stuff
ericallam Jun 18, 2025
5b49b42
Add changeset
ericallam Jun 18, 2025
58d56f8
Use hostname instead of host
ericallam Jun 18, 2025
020565c
Handle dev process pool shutdown errors
ericallam Jun 18, 2025
56146af
TaskRunProcessProvider cleanup is async
ericallam Jun 18, 2025
c41d634
Handle immediate retries better
ericallam Jun 18, 2025
2018b41
TRIGGER -> TRIGGERDOTDEV
ericallam Jun 18, 2025
bffd18a
Properly reset listener arrays by setting length to 0
ericallam Jun 18, 2025
a0f2c98
Make sure to clear existing metadata flush timer when resetting
ericallam Jun 18, 2025
707bc09
Make sure maxExecutionsPerProcess is a positive integer
ericallam Jun 18, 2025
7f3b1c1
Ensure the usage interval is cleared when resetting
ericallam Jun 18, 2025
4d15c8f
Better reset in the prod usage manager
ericallam Jun 18, 2025
f0bc4ad
Don't delete env vars with falsy values
ericallam Jun 18, 2025
82f3db6
Only bootstrap on the cold start execution, and record how many execu…
ericallam Jun 18, 2025
2cc1453
Always set the cold/warm variant on attempt spans, even in dev
ericallam Jun 18, 2025
9ab590b
Better handle snapshot changes with the new persistent task run process,
ericallam Jun 19, 2025
aecee4d
Don't reset task lifecycle hooks between executions
ericallam Jun 20, 2025
b548561
Remvoe the sometimes incorrect import file name
ericallam Jun 20, 2025
632d6b9
Fix taskSlug extraction
ericallam Jun 20, 2025
ceb737e
Fix broken webapp test
ericallam Jun 20, 2025
57f3a89
Fix timeline span events tests
ericallam Jun 25, 2025
8dc90d6
increase run engine test timeouts
ericallam Jun 25, 2025
0793c2b
address some nick comments in the review
ericallam Jun 26, 2025
0deb2c6
remove change from process-keep-alive
ericallam Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-badgers-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

experimental processKeepAlive
10 changes: 10 additions & 0 deletions apps/supervisor/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { isMacOS, isWindows } from "std-env";

export function normalizeDockerHostUrl(url: string) {
const $url = new URL(url);

if ($url.hostname === "localhost") {
$url.hostname = getDockerHostDomain();
}

return $url.toString();
}

export function getDockerHostDomain() {
return isMacOS || isWindows ? "host.docker.internal" : "localhost";
}
Expand Down
4 changes: 2 additions & 2 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
type WorkloadManagerOptions,
} from "./types.js";
import { env } from "../env.js";
import { getDockerHostDomain, getRunnerId } from "../util.js";
import { getDockerHostDomain, getRunnerId, normalizeDockerHostUrl } from "../util.js";
import Docker from "dockerode";
import { tryCatch } from "@trigger.dev/core";

Expand Down Expand Up @@ -78,7 +78,7 @@ export class DockerWorkloadManager implements WorkloadManager {
];

if (this.opts.warmStartUrl) {
envVars.push(`TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
envVars.push(`TRIGGER_WARM_START_URL=${normalizeDockerHostUrl(this.opts.warmStartUrl)}`);
}

if (this.opts.metadataUrl) {
Expand Down
3 changes: 0 additions & 3 deletions apps/webapp/app/utils/timelineSpanEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ function getFriendlyNameForEvent(event: string, properties?: Record<string, any>
return "Attempt created";
}
case "import": {
if (properties && typeof properties.file === "string") {
return `Importing ${properties.file}`;
}
return "Importing task file";
}
case "lazy_payload": {
Expand Down
258 changes: 180 additions & 78 deletions apps/webapp/app/v3/otlpExporter.server.ts

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions apps/webapp/test/timelineSpanEvents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result.some((event) => event.name === "Dequeued")).toBe(true);
expect(result.some((event) => event.name === "Launched")).toBe(true);
expect(result.some((event) => event.name === "Attempt created")).toBe(true);
expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true);
expect(result.some((event) => event.name === "Importing task file")).toBe(true);
});

test("should sort events by timestamp", () => {
Expand All @@ -86,7 +86,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result[0].name).toBe("Dequeued");
expect(result[1].name).toBe("Attempt created");
expect(result[2].name).toBe("Launched");
expect(result[3].name).toBe("Importing src/trigger/chat.ts");
expect(result[3].name).toBe("Importing task file");
});

test("should calculate offsets correctly from the first event", () => {
Expand Down Expand Up @@ -176,7 +176,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result.find((e) => e.name === "Attempt created")?.helpText).toBe(
"An attempt was created for the run"
);
expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.helpText).toBe(
expect(result.find((e) => e.name === "Importing task file")?.helpText).toBe(
"A task file was imported"
);
});
Expand All @@ -187,7 +187,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result.find((e) => e.name === "Dequeued")?.duration).toBe(0);
expect(result.find((e) => e.name === "Launched")?.duration).toBe(127);
expect(result.find((e) => e.name === "Attempt created")?.duration).toBe(56);
expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.duration).toBe(67);
expect(result.find((e) => e.name === "Importing task file")?.duration).toBe(67);
});

test("should use fallback name for import event without file property", () => {
Expand All @@ -214,7 +214,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
// Without fork event, import should also be visible for non-admins
expect(result.length).toBe(2);
expect(result.some((event) => event.name === "Dequeued")).toBe(true);
expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true);
expect(result.some((event) => event.name === "Importing task file")).toBe(true);

// create_attempt should still be admin-only
expect(result.some((event) => event.name === "Attempt created")).toBe(false);
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export default defineConfig({
singleThread: true,
},
},
testTimeout: 60_000,
testTimeout: 120_000,
coverage: {
provider: "v8",
},
Expand Down
63 changes: 63 additions & 0 deletions packages/cli-v3/src/dev/devSupervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
import pLimit from "p-limit";
import { resolveLocalEnvVars } from "../utilities/localEnvVars.js";
import type { Metafile } from "esbuild";
import { TaskRunProcessPool } from "./taskRunProcessPool.js";
import { tryCatch } from "@trigger.dev/core/utils";

export type WorkerRuntimeOptions = {
name: string | undefined;
Expand Down Expand Up @@ -67,6 +69,7 @@ class DevSupervisor implements WorkerRuntime {
private socketConnections = new Set<string>();

private runLimiter?: ReturnType<typeof pLimit>;
private taskRunProcessPool?: TaskRunProcessPool;

constructor(public readonly options: WorkerRuntimeOptions) {}

Expand Down Expand Up @@ -95,6 +98,42 @@ class DevSupervisor implements WorkerRuntime {

this.runLimiter = pLimit(maxConcurrentRuns);

// Initialize the task run process pool
const env = await this.#getEnvVars();

const enableProcessReuse =
typeof this.options.config.experimental_processKeepAlive === "boolean"
? this.options.config.experimental_processKeepAlive
: typeof this.options.config.experimental_processKeepAlive === "object"
? this.options.config.experimental_processKeepAlive.enabled
: false;

const maxPoolSize =
typeof this.options.config.experimental_processKeepAlive === "object"
? this.options.config.experimental_processKeepAlive.devMaxPoolSize ?? 25
: 25;

const maxExecutionsPerProcess =
typeof this.options.config.experimental_processKeepAlive === "object"
? this.options.config.experimental_processKeepAlive.maxExecutionsPerProcess ?? 50
: 50;

if (enableProcessReuse) {
logger.debug("[DevSupervisor] Enabling process reuse", {
enableProcessReuse,
maxPoolSize,
maxExecutionsPerProcess,
});
}

this.taskRunProcessPool = new TaskRunProcessPool({
env,
cwd: this.options.config.workingDir,
enableProcessReuse,
maxPoolSize,
maxExecutionsPerProcess,
});

this.socket = this.#createSocket();

//start an SSE connection for presence
Expand All @@ -111,6 +150,17 @@ class DevSupervisor implements WorkerRuntime {
} catch (error) {
logger.debug("[DevSupervisor] shutdown, socket failed to close", { error });
}

// Shutdown the task run process pool
if (this.taskRunProcessPool) {
const [shutdownError] = await tryCatch(this.taskRunProcessPool.shutdown());

if (shutdownError) {
logger.debug("[DevSupervisor] shutdown, task run process pool failed to shutdown", {
error: shutdownError,
});
}
}
}

async initializeWorker(
Expand Down Expand Up @@ -293,12 +343,21 @@ class DevSupervisor implements WorkerRuntime {
continue;
}

if (!this.taskRunProcessPool) {
logger.debug(`[DevSupervisor] dequeueRuns. No task run process pool`, {
run: message.run.friendlyId,
worker,
});
continue;
}

//new run
runController = new DevRunController({
runFriendlyId: message.run.friendlyId,
worker: worker,
httpClient: this.options.client,
logLevel: this.options.args.logLevel,
taskRunProcessPool: this.taskRunProcessPool,
onFinished: () => {
logger.debug("[DevSupervisor] Run finished", { runId: message.run.friendlyId });

Expand Down Expand Up @@ -574,6 +633,10 @@ class DevSupervisor implements WorkerRuntime {
return;
}

if (worker.serverWorker?.version) {
this.taskRunProcessPool?.deprecateVersion(worker.serverWorker?.version);
}

if (this.#workerHasInProgressRuns(friendlyId)) {
return;
}
Expand Down
Loading