Skip to content

Commit 0591db5

Browse files
authored
Fix for short runs inside (batch)triggerAndWait (#1263)
* Test for checkpoints * Make sourceTaskAttemptId optional on resumeBatchRun * Removed all completions/executions logic from the shared queue consumer * Removed the sourceTaskAttemptId from ResumeBatchRunService * Revert "Removed all completions/executions logic from the shared queue consumer" This reverts commit d35398d. * WIP on triggerAndWait… * Fixed triggerAndWait continuing when a checkpoint completes * Remove the ResumeAttempt code that fails attempts (was protecting against infinite restores) * Removed messageBody.data.completedAttemptIds.length === 0 commented out code * Don’t ack if there’s no batchRun * Added the marqs?.replaceMessage back in but NOT when there’s no checkpoint. More logging This is a fix for when some attempts fail * Improvement to the test task that now randomly fails attempts * When a checkpoint happens, only continue the attempt if it’s in the correct state * Changeset for rollback in branch * Set keepRunAlive to false when the dependent task isn’t finished * Changeset manual version (to get inline with the hotfix branch) * Changeset: Fixes for continuing after waits * Latest lockfile (after manual changeset version)
1 parent a640649 commit 0591db5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+635
-232
lines changed

.changeset/early-impalas-itch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fixes for continuing after waits

.changeset/long-feet-invent.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Rollback to try and fix some dependent attempt issues

.changeset/pre.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
"light-bulldogs-press",
9898
"light-dragons-complain",
9999
"little-crabs-cross",
100+
"long-feet-invent",
100101
"long-fireants-search",
101102
"long-hounds-wave",
102103
"loud-actors-remember",

apps/webapp/app/services/worker.server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ const workerCatalog = {
131131
}),
132132
"v3.resumeBatchRun": z.object({
133133
batchRunId: z.string(),
134-
sourceTaskAttemptId: z.string(),
135134
}),
136135
"v3.resumeTaskDependency": z.object({
137136
dependencyId: z.string(),
@@ -550,7 +549,7 @@ function getWorkerQueue() {
550549
handler: async (payload, job) => {
551550
const service = new ResumeBatchRunService();
552551

553-
return await service.call(payload.batchRunId, payload.sourceTaskAttemptId);
552+
return await service.call(payload.batchRunId);
554553
},
555554
},
556555
"v3.resumeTaskDependency": {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -617,16 +617,6 @@ export class SharedQueueConsumer {
617617
return;
618618
}
619619

620-
if (messageBody.data.completedAttemptIds.length < 1) {
621-
logger.error("No attempt IDs provided", {
622-
queueMessage: message.data,
623-
messageId: message.messageId,
624-
});
625-
626-
await this.#ackAndDoMoreWork(message.messageId);
627-
return;
628-
}
629-
630620
const resumableRun = await prisma.taskRun.findUnique({
631621
where: {
632622
id: message.messageId,

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 172 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ import { CoordinatorToPlatformMessages } from "@trigger.dev/core/v3";
22
import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket";
33
import type { Checkpoint, CheckpointRestoreEvent } from "@trigger.dev/database";
44
import { logger } from "~/services/logger.server";
5-
import { generateFriendlyId } from "../friendlyIdentifiers";
65
import { marqs } from "~/v3/marqs/index.server";
7-
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
6+
import { generateFriendlyId } from "../friendlyIdentifiers";
7+
import {
8+
FINAL_ATTEMPT_STATUSES,
9+
isFinalAttemptStatus,
10+
isFinalRunStatus,
11+
isFreezableAttemptStatus,
12+
isFreezableRunStatus,
13+
} from "../taskStatus";
814
import { BaseService } from "./baseService.server";
9-
import { isFinalRunStatus, isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus";
15+
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
16+
import { ResumeBatchRunService } from "./resumeBatchRun.server";
17+
import { ResumeTaskDependencyService } from "./resumeTaskDependency.server";
1018

1119
export class CreateCheckpointService extends BaseService {
1220
public async call(
@@ -89,6 +97,9 @@ export class CreateCheckpointService extends BaseService {
8997
};
9098
}
9199

100+
//sleep to test slow checkpoints
101+
// await new Promise((resolve) => setTimeout(resolve, 60_000));
102+
92103
const checkpoint = await this._prisma.checkpoint.create({
93104
data: {
94105
friendlyId: generateFriendlyId("checkpoint"),
@@ -140,10 +151,133 @@ export class CreateCheckpointService extends BaseService {
140151
dependencyFriendlyRunId: reason.friendlyId,
141152
});
142153

143-
keepRunAlive = await this.#isRunCompleted(reason.friendlyId);
144-
145-
if (!keepRunAlive) {
146-
await marqs?.acknowledgeMessage(attempt.taskRunId);
154+
if (checkpointEvent) {
155+
const dependency = await this._prisma.taskRunDependency.findFirst({
156+
select: {
157+
id: true,
158+
taskRunId: true,
159+
},
160+
where: {
161+
taskRun: {
162+
friendlyId: reason.friendlyId,
163+
},
164+
},
165+
});
166+
167+
logger.log("CreateCheckpointService: Created checkpoint WAIT_FOR_TASK", {
168+
checkpointId: checkpoint.id,
169+
runFriendlyId: reason.friendlyId,
170+
dependencyId: dependency?.id,
171+
});
172+
173+
if (!dependency) {
174+
logger.error("CreateCheckpointService: Dependency not found", {
175+
friendlyId: reason.friendlyId,
176+
});
177+
178+
return {
179+
success: true,
180+
checkpoint,
181+
event: checkpointEvent,
182+
keepRunAlive: false,
183+
};
184+
}
185+
186+
const childRun = await this._prisma.taskRun.findFirst({
187+
select: {
188+
id: true,
189+
status: true,
190+
},
191+
where: {
192+
id: dependency.taskRunId,
193+
},
194+
});
195+
196+
if (!childRun) {
197+
logger.error("CreateCheckpointService: Dependency child run not found", {
198+
taskRunId: dependency.taskRunId,
199+
runFriendlyId: reason.friendlyId,
200+
dependencyId: dependency.id,
201+
});
202+
203+
return {
204+
success: true,
205+
checkpoint,
206+
event: checkpointEvent,
207+
keepRunAlive: false,
208+
};
209+
}
210+
211+
const isFinished = isFinalRunStatus(childRun.status);
212+
if (!isFinished) {
213+
logger.debug("CreateCheckpointService: Dependency child run not finished", {
214+
taskRunId: dependency.taskRunId,
215+
runFriendlyId: reason.friendlyId,
216+
dependencyId: dependency.id,
217+
childRunStatus: childRun.status,
218+
childRunId: childRun.id,
219+
});
220+
221+
return {
222+
success: true,
223+
checkpoint,
224+
event: checkpointEvent,
225+
keepRunAlive: false,
226+
};
227+
}
228+
229+
const lastAttempt = await this._prisma.taskRunAttempt.findFirst({
230+
select: {
231+
id: true,
232+
status: true,
233+
},
234+
where: {
235+
taskRunId: dependency.taskRunId,
236+
},
237+
orderBy: {
238+
createdAt: "desc",
239+
},
240+
});
241+
242+
if (!lastAttempt) {
243+
logger.debug("CreateCheckpointService: Dependency child attempt not found", {
244+
taskRunId: dependency.taskRunId,
245+
runFriendlyId: reason.friendlyId,
246+
dependencyId: dependency?.id,
247+
});
248+
return {
249+
success: true,
250+
checkpoint,
251+
event: checkpointEvent,
252+
keepRunAlive: false,
253+
};
254+
}
255+
256+
if (!isFinalAttemptStatus(lastAttempt.status)) {
257+
logger.debug("CreateCheckpointService: Dependency child attempt not final", {
258+
taskRunId: dependency.taskRunId,
259+
runFriendlyId: reason.friendlyId,
260+
dependencyId: dependency.id,
261+
lastAttemptId: lastAttempt.id,
262+
lastAttemptStatus: lastAttempt.status,
263+
});
264+
265+
return {
266+
success: true,
267+
checkpoint,
268+
event: checkpointEvent,
269+
keepRunAlive: false,
270+
};
271+
}
272+
273+
await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id, this._prisma);
274+
275+
return {
276+
success: true,
277+
checkpoint,
278+
event: checkpointEvent,
279+
keepRunAlive: false,
280+
};
147281
}
148282

149283
break;
@@ -154,10 +288,37 @@ export class CreateCheckpointService extends BaseService {
154288
batchDependencyFriendlyId: reason.batchFriendlyId,
155289
});
156290

157-
keepRunAlive = await this.#isBatchCompleted(reason.batchFriendlyId);
158-
159-
if (!keepRunAlive) {
160-
await marqs?.acknowledgeMessage(attempt.taskRunId);
291+
if (checkpointEvent) {
292+
const batchRun = await this._prisma.batchTaskRun.findFirst({
293+
select: {
294+
id: true,
295+
},
296+
where: {
297+
friendlyId: reason.batchFriendlyId,
298+
},
299+
});
300+
301+
if (!batchRun) {
302+
logger.error("CreateCheckpointService: Batch not found", {
303+
friendlyId: reason.batchFriendlyId,
304+
});
305+
306+
return {
307+
success: true,
308+
checkpoint,
309+
event: checkpointEvent,
310+
keepRunAlive: false,
311+
};
312+
}
313+
314+
await ResumeBatchRunService.enqueue(batchRun.id, this._prisma);
315+
316+
return {
317+
success: true,
318+
checkpoint,
319+
event: checkpointEvent,
320+
keepRunAlive: false,
321+
};
161322
}
162323

163324
break;
@@ -206,34 +367,4 @@ export class CreateCheckpointService extends BaseService {
206367
keepRunAlive,
207368
};
208369
}
209-
210-
async #isBatchCompleted(friendlyId: string): Promise<boolean> {
211-
const batch = await this._prisma.batchTaskRun.findUnique({
212-
where: {
213-
friendlyId,
214-
},
215-
});
216-
217-
if (!batch) {
218-
logger.error("Batch not found", { friendlyId });
219-
return false;
220-
}
221-
222-
return batch.status === "COMPLETED";
223-
}
224-
225-
async #isRunCompleted(friendlyId: string): Promise<boolean> {
226-
const run = await this._prisma.taskRun.findUnique({
227-
where: {
228-
friendlyId,
229-
},
230-
});
231-
232-
if (!run) {
233-
logger.error("Run not found", { friendlyId });
234-
return false;
235-
}
236-
237-
return isFinalRunStatus(run.status);
238-
}
239370
}

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,6 @@ export class ResumeAttemptService extends BaseService {
152152
break;
153153
}
154154
}
155-
156-
// Prevent infinite restores by failing runs that don't heartbeat after post-restore resume requests
157-
await this.#replaceResumeWithFailMessage(attempt.taskRunId, params.type);
158155
});
159156
}
160157

@@ -255,50 +252,4 @@ export class ResumeAttemptService extends BaseService {
255252
},
256253
});
257254
}
258-
259-
async #replaceResumeWithFailMessage(messageId: string, waitReason: WaitReason) {
260-
const currentMessage = await marqs?.readMessage(messageId);
261-
262-
if (!currentMessage) {
263-
logger.debug("No message to replace", { messageId, waitReason });
264-
return;
265-
}
266-
267-
const currentBody = SharedQueueMessageBody.safeParse(currentMessage.data);
268-
269-
if (!currentBody.success) {
270-
logger.debug("Invalid message body", { messageId, waitReason, currentBody });
271-
return;
272-
}
273-
274-
const currentType = currentBody.data.type;
275-
276-
if (currentType !== "RESUME" && currentType !== "RESUME_AFTER_DURATION") {
277-
logger.debug("Not a resume message", { messageId, waitReason, currentBody });
278-
return;
279-
}
280-
281-
let reason = "Worker unresponsive after restore";
282-
283-
switch (waitReason) {
284-
case "WAIT_FOR_DURATION":
285-
reason = "Worker unresponsive after waiting for duration";
286-
break;
287-
case "WAIT_FOR_TASK":
288-
reason = "Worker unresponsive after waiting for task";
289-
break;
290-
case "WAIT_FOR_BATCH":
291-
reason = "Worker unresponsive after waiting for batch task";
292-
break;
293-
default:
294-
break;
295-
}
296-
297-
const failMessage: SharedQueueMessageBody = {
298-
type: "FAIL",
299-
reason,
300-
};
301-
302-
return await marqs?.replaceMessage(messageId, failMessage, undefined, true);
303-
}
304255
}

0 commit comments

Comments
 (0)