Skip to content

Commit c6c4ce7

Browse files
fix(core): propagate taskIdentifier for batch triggers
1 parent 5e049cd commit c6c4ce7

File tree

4 files changed

+191
-31
lines changed

4 files changed

+191
-31
lines changed

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@ export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
3131
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
3232
include: {
3333
checkpoint: true;
34-
completedWaitpoints: true;
34+
completedWaitpoints: {
35+
include: {
36+
completedByTaskRun: {
37+
select: {
38+
taskIdentifier: true;
39+
};
40+
};
41+
};
42+
};
3543
};
3644
}>;
3745

@@ -57,7 +65,9 @@ function enhanceExecutionSnapshot(
5765
*/
5866
function enhanceExecutionSnapshotWithWaitpoints(
5967
snapshot: ExecutionSnapshotWithCheckpoint,
60-
waitpoints: Waitpoint[],
68+
waitpoints: (Waitpoint & {
69+
completedByTaskRun: { taskIdentifier: string | null } | null;
70+
})[],
6171
completedWaitpointOrder: string[]
6272
): EnhancedExecutionSnapshot {
6373
return {
@@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints(
8999
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
90100
completedByTaskRun: w.completedByTaskRunId
91101
? {
92-
id: w.completedByTaskRunId,
93-
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
94-
batch: snapshot.batchId
95-
? {
96-
id: snapshot.batchId,
97-
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
98-
}
99-
: undefined,
100-
}
102+
id: w.completedByTaskRunId,
103+
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
104+
batch: snapshot.batchId
105+
? {
106+
id: snapshot.batchId,
107+
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
108+
}
109+
: undefined,
110+
taskIdentifier: w.completedByTaskRun?.taskIdentifier ?? undefined,
111+
}
101112
: undefined,
102113
completedAfter: w.completedAfter ?? undefined,
103114
completedByBatch: w.completedByBatchId
104115
? {
105-
id: w.completedByBatchId,
106-
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
107-
}
116+
id: w.completedByBatchId,
117+
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
118+
}
108119
: undefined,
109120
output: w.output ?? undefined,
110121
outputType: w.outputType,
@@ -137,14 +148,23 @@ async function getSnapshotWaitpointIds(
137148
async function fetchWaitpointsInChunks(
138149
prisma: PrismaClientOrTransaction,
139150
waitpointIds: string[]
140-
): Promise<Waitpoint[]> {
151+
): Promise<(Waitpoint & { completedByTaskRun: { taskIdentifier: string | null } | null })[]> {
141152
if (waitpointIds.length === 0) return [];
142153

143-
const allWaitpoints: Waitpoint[] = [];
154+
const allWaitpoints: (Waitpoint & {
155+
completedByTaskRun: { taskIdentifier: string | null } | null;
156+
})[] = [];
144157
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
145158
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
146159
const waitpoints = await prisma.waitpoint.findMany({
147160
where: { id: { in: chunk } },
161+
include: {
162+
completedByTaskRun: {
163+
select: {
164+
taskIdentifier: true,
165+
},
166+
},
167+
},
148168
});
149169
allWaitpoints.push(...waitpoints);
150170
}
@@ -159,7 +179,15 @@ export async function getLatestExecutionSnapshot(
159179
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
160180
where: { runId, isValid: true },
161181
include: {
162-
completedWaitpoints: true,
182+
completedWaitpoints: {
183+
include: {
184+
completedByTaskRun: {
185+
select: {
186+
taskIdentifier: true,
187+
},
188+
},
189+
},
190+
},
163191
checkpoint: true,
164192
},
165193
orderBy: { createdAt: "desc" },
@@ -179,7 +207,15 @@ export async function getExecutionSnapshotCompletedWaitpoints(
179207
const waitpoints = await prisma.taskRunExecutionSnapshot.findFirst({
180208
where: { id: snapshotId },
181209
include: {
182-
completedWaitpoints: true,
210+
completedWaitpoints: {
211+
include: {
212+
completedByTaskRun: {
213+
select: {
214+
taskIdentifier: true,
215+
},
216+
},
217+
},
218+
},
183219
},
184220
});
185221

@@ -233,19 +269,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
233269
},
234270
batch: snapshot.batchId
235271
? {
236-
id: snapshot.batchId,
237-
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
238-
}
272+
id: snapshot.batchId,
273+
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
274+
}
239275
: undefined,
240276
checkpoint: snapshot.checkpoint
241277
? {
242-
id: snapshot.checkpoint.id,
243-
friendlyId: snapshot.checkpoint.friendlyId,
244-
type: snapshot.checkpoint.type,
245-
location: snapshot.checkpoint.location,
246-
imageRef: snapshot.checkpoint.imageRef,
247-
reason: snapshot.checkpoint.reason ?? undefined,
248-
}
278+
id: snapshot.checkpoint.id,
279+
friendlyId: snapshot.checkpoint.friendlyId,
280+
type: snapshot.checkpoint.type,
281+
location: snapshot.checkpoint.location,
282+
imageRef: snapshot.checkpoint.imageRef,
283+
reason: snapshot.checkpoint.reason ?? undefined,
284+
}
249285
: undefined,
250286
completedWaitpoints: snapshot.completedWaitpoints,
251287
};
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { SharedRuntimeManager } from "./sharedRuntimeManager.js";
3+
import { CompletedWaitpoint } from "../schemas/index.js";
4+
5+
describe("SharedRuntimeManager", () => {
6+
const mockIpc = {
7+
send: vi.fn(),
8+
} as any;
9+
10+
const manager = new SharedRuntimeManager(mockIpc, false);
11+
12+
// Access private method
13+
const waitpointToResult = (manager as any).waitpointToTaskRunExecutionResult.bind(manager);
14+
15+
describe("waitpointToTaskRunExecutionResult", () => {
16+
it("should use the taskIdentifier from the waitpoint if present (success)", () => {
17+
const waitpoint: CompletedWaitpoint = {
18+
id: "wp_1",
19+
friendlyId: "wp_1",
20+
type: "RUN",
21+
completedAt: new Date(),
22+
outputIsError: false,
23+
output: JSON.stringify({ foo: "bar" }),
24+
outputType: "application/json",
25+
completedByTaskRun: {
26+
id: "run_1",
27+
friendlyId: "run_1",
28+
taskIdentifier: "my-task",
29+
},
30+
};
31+
32+
const result = waitpointToResult(waitpoint);
33+
34+
expect(result).toEqual({
35+
ok: true,
36+
id: "run_1",
37+
taskIdentifier: "my-task",
38+
output: JSON.stringify({ foo: "bar" }),
39+
outputType: "application/json",
40+
});
41+
});
42+
43+
it("should default taskIdentifier to 'unknown' if missing (success)", () => {
44+
const waitpoint: CompletedWaitpoint = {
45+
id: "wp_2",
46+
friendlyId: "wp_2",
47+
type: "RUN",
48+
completedAt: new Date(),
49+
outputIsError: false,
50+
output: JSON.stringify({ foo: "bar" }),
51+
outputType: "application/json",
52+
completedByTaskRun: {
53+
id: "run_2",
54+
friendlyId: "run_2",
55+
// database/legacy object missing taskIdentifier
56+
} as any,
57+
};
58+
59+
const result = waitpointToResult(waitpoint);
60+
61+
expect(result).toEqual({
62+
ok: true,
63+
id: "run_2",
64+
taskIdentifier: "unknown",
65+
output: JSON.stringify({ foo: "bar" }),
66+
outputType: "application/json",
67+
});
68+
});
69+
70+
it("should use the taskIdentifier from the waitpoint if present (failure)", () => {
71+
const waitpoint: CompletedWaitpoint = {
72+
id: "wp_3",
73+
friendlyId: "wp_3",
74+
type: "RUN",
75+
completedAt: new Date(),
76+
outputIsError: true,
77+
output: JSON.stringify({ message: "Boom" }),
78+
outputType: "application/json",
79+
completedByTaskRun: {
80+
id: "run_3",
81+
friendlyId: "run_3",
82+
taskIdentifier: "my-failed-task",
83+
},
84+
};
85+
86+
const result = waitpointToResult(waitpoint);
87+
88+
expect(result).toEqual({
89+
ok: false,
90+
id: "run_3",
91+
taskIdentifier: "my-failed-task",
92+
error: { message: "Boom" },
93+
});
94+
});
95+
96+
it("should default taskIdentifier to 'unknown' if missing (failure)", () => {
97+
const waitpoint: CompletedWaitpoint = {
98+
id: "wp_4",
99+
friendlyId: "wp_4",
100+
type: "RUN",
101+
completedAt: new Date(),
102+
outputIsError: true,
103+
output: JSON.stringify({ message: "Boom" }),
104+
outputType: "application/json",
105+
completedByTaskRun: {
106+
id: "run_4",
107+
friendlyId: "run_4",
108+
} as any,
109+
};
110+
111+
const result = waitpointToResult(waitpoint);
112+
113+
expect(result).toEqual({
114+
ok: false,
115+
id: "run_4",
116+
taskIdentifier: "unknown",
117+
error: { message: "Boom" },
118+
});
119+
});
120+
});
121+
});

packages/core/src/v3/runtime/sharedRuntimeManager.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager {
293293
return {
294294
ok: false,
295295
id: waitpoint.completedByTaskRun.friendlyId,
296+
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
296297
error: waitpoint.output
297298
? JSON.parse(waitpoint.output)
298299
: {
299-
type: "STRING_ERROR",
300-
message: "Missing error output",
301-
},
300+
type: "STRING_ERROR",
301+
message: "Missing error output",
302+
},
302303
} satisfies TaskRunFailedExecutionResult;
303304
} else {
304305
return {
305306
ok: true,
306307
id: waitpoint.completedByTaskRun.friendlyId,
308+
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
307309
output: waitpoint.output,
308310
outputType: waitpoint.outputType ?? "application/json",
309311
} satisfies TaskRunSuccessfulExecutionResult;

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ export const CompletedWaitpoint = z.object({
8181
.object({
8282
id: z.string(),
8383
friendlyId: z.string(),
84+
taskIdentifier: z.string().optional(),
8485
/** If the run has an associated batch */
8586
batch: z
8687
.object({

0 commit comments

Comments
 (0)