Skip to content

Serialize metadata to prevent invalid data from breaking run completions #2219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wet-dragons-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Serialize metadata to prevent invalid metadata from breaking run completions
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { startSpan } from "@internal/tracing";
import {
CompleteRunAttemptResult,
ExecutionResult,
FlushedRunMetadata,
GitMeta,
StartRunAttemptResult,
TaskRunError,
Expand Down Expand Up @@ -35,6 +36,7 @@ import {
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
import { tryCatch } from "@trigger.dev/core/utils";

export type RunAttemptSystemOptions = {
resources: SystemResources;
Expand Down Expand Up @@ -386,15 +388,7 @@ export class RunAttemptSystem {
workerId?: string;
runnerId?: string;
}): Promise<CompleteRunAttemptResult> {
if (completion.metadata) {
this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: completion.metadata,
},
});
}
await this.#notifyMetadataUpdated(runId, completion);

switch (completion.ok) {
case true: {
Expand Down Expand Up @@ -1314,4 +1308,56 @@ export class RunAttemptSystem {

return taskRun?.runtimeEnvironment;
}

async #notifyMetadataUpdated(runId: string, completion: TaskRunExecutionResult) {
if (completion.metadata) {
this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: completion.metadata,
},
});

return;
}

if (completion.flushedMetadata) {
const [packetError, packet] = await tryCatch(parsePacket(completion.flushedMetadata));

if (!packet) {
return;
}

if (packetError) {
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
runId,
flushedMetadata: completion.flushedMetadata,
error: packetError,
});

return;
}

const metadata = FlushedRunMetadata.safeParse(packet);

if (!metadata.success) {
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
runId,
flushedMetadata: completion.flushedMetadata,
error: metadata.error,
});

return;
}

this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: metadata.data,
},
});
}
}
}
12 changes: 6 additions & 6 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -382,7 +382,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -447,7 +447,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand All @@ -473,7 +473,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -518,7 +518,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand All @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand Down
12 changes: 6 additions & 6 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -381,7 +381,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -444,7 +444,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -472,7 +472,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -517,7 +517,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand All @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand Down
11 changes: 8 additions & 3 deletions packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MetadataStream } from "./metadataStream.js";
import { applyMetadataOperations, collapseOperations } from "./operations.js";
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
import { IOPacket, stringifyIO } from "../utils/ioSerialization.js";

const MAXIMUM_ACTIVE_STREAMS = 5;
const MAXIMUM_TOTAL_STREAMS = 10;
Expand Down Expand Up @@ -422,23 +423,27 @@ export class StandardMetadataManager implements RunMetadataManager {
}
}

stopAndReturnLastFlush(): FlushedRunMetadata | undefined {
async stopAndReturnLastFlush(): Promise<IOPacket> {
this.stopPeriodicFlush();
this.isFlushing = true;

if (!this.#needsFlush()) {
return;
return { dataType: "application/json" };
}

const operations = Array.from(this.queuedOperations);
const parentOperations = Array.from(this.queuedParentOperations);
const rootOperations = Array.from(this.queuedRootOperations);

return {
const data = {
operations: collapseOperations(operations),
parentOperations: collapseOperations(parentOperations),
rootOperations: collapseOperations(rootOperations),
};

const packet = await stringifyIO(data);

return packet;
}

#needsFlush(): boolean {
Expand Down
16 changes: 16 additions & 0 deletions packages/core/src/v3/schemas/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,15 @@ export const TaskRunFailedExecutionResult = z.object({
usage: TaskRunExecutionUsage.optional(),
// Optional for now for backwards compatibility
taskIdentifier: z.string().optional(),
// This is deprecated, use flushedMetadata instead
metadata: FlushedRunMetadata.optional(),
// This is the new way to flush metadata
flushedMetadata: z
.object({
data: z.string().optional(),
dataType: z.string(),
})
.optional(),
});

export type TaskRunFailedExecutionResult = z.infer<typeof TaskRunFailedExecutionResult>;
Expand All @@ -389,7 +397,15 @@ export const TaskRunSuccessfulExecutionResult = z.object({
usage: TaskRunExecutionUsage.optional(),
// Optional for now for backwards compatibility
taskIdentifier: z.string().optional(),
// This is deprecated, use flushedMetadata instead
metadata: FlushedRunMetadata.optional(),
// This is the new way to flush metadata
flushedMetadata: z
.object({
data: z.string().optional(),
dataType: z.string(),
})
.optional(),
});

export type TaskRunSuccessfulExecutionResult = z.infer<typeof TaskRunSuccessfulExecutionResult>;
Expand Down
16 changes: 16 additions & 0 deletions references/hello-world/src/trigger/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { metadata, task } from "@trigger.dev/sdk";

export const metadataTestTask = task({
id: "metadata-tester",
retry: {
maxAttempts: 3,
minTimeoutInMs: 500,
maxTimeoutInMs: 1000,
factor: 1.5,
},
run: async (payload: any, { ctx }) => {
metadata.set("test-key", "test-value");
metadata.append("test-keys", "test-value");
metadata.increment("test-counter", 1);
},
});