Skip to content

Commit 584722d

Browse files
authored
[v4] Complete a waitpoint and then get affected runs (#2034)
* Complete waitpoint before getting affected runs. Added more logging * Added another error log
1 parent 1b1ad16 commit 584722d

File tree

1 file changed

+76
-22
lines changed

1 file changed

+76
-22
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,7 @@ export class WaitpointSystem {
6666
isError: boolean;
6767
};
6868
}): Promise<Waitpoint> {
69-
// 1. Find the TaskRuns blocked by this waitpoint
70-
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
71-
where: { waitpointId: id },
72-
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
73-
});
74-
75-
if (affectedTaskRuns.length === 0) {
76-
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
77-
waitpointId: id,
78-
});
79-
}
80-
69+
// 1. Complete the Waitpoint (if not completed)
8170
let [waitpointError, waitpoint] = await tryCatch(
8271
this.$.prisma.waitpoint.update({
8372
where: { id, status: "PENDING" },
@@ -109,15 +98,44 @@ export class WaitpointSystem {
10998
throw new Error(`Waitpoint ${id} not found`);
11099
}
111100

112-
//schedule trying to continue the runs
101+
if (waitpoint.status !== "COMPLETED") {
102+
this.$.logger.error(`completeWaitpoint: waitpoint is not completed`, {
103+
waitpointId: id,
104+
});
105+
throw new Error(`Waitpoint ${id} is not completed`);
106+
}
107+
108+
// 2. Find the TaskRuns blocked by this waitpoint
109+
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
110+
where: { waitpointId: id },
111+
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
112+
});
113+
114+
if (affectedTaskRuns.length === 0) {
115+
this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, {
116+
waitpointId: id,
117+
});
118+
}
119+
120+
// 3. Schedule trying to continue the runs
113121
for (const run of affectedTaskRuns) {
122+
const jobId = `continueRunIfUnblocked:${run.taskRunId}`;
123+
//50ms in the future
124+
const availableAt = new Date(Date.now() + 50);
125+
126+
this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, {
127+
waitpointId: id,
128+
runId: run.taskRunId,
129+
jobId,
130+
availableAt,
131+
});
132+
114133
await this.$.worker.enqueue({
115134
//this will debounce the call
116-
id: `continueRunIfUnblocked:${run.taskRunId}`,
135+
id: jobId,
117136
job: "continueRunIfUnblocked",
118137
payload: { runId: run.taskRunId },
119-
//50ms in the future
120-
availableAt: new Date(Date.now() + 50),
138+
availableAt,
121139
});
122140

123141
// emit an event to complete associated cached runs
@@ -469,6 +487,10 @@ export class WaitpointSystem {
469487
}
470488

471489
public async continueRunIfUnblocked({ runId }: { runId: string }) {
490+
this.$.logger.debug(`continueRunIfUnblocked: start`, {
491+
runId,
492+
});
493+
472494
// 1. Get the any blocking waitpoints
473495
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
474496
where: { taskRunId: runId },
@@ -483,6 +505,10 @@ export class WaitpointSystem {
483505

484506
// 2. There are blockers still, so do nothing
485507
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
508+
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
509+
runId,
510+
blockingWaitpoints,
511+
});
486512
return;
487513
}
488514

@@ -505,15 +531,18 @@ export class WaitpointSystem {
505531
});
506532

507533
if (!run) {
508-
throw new Error(`#continueRunIfUnblocked: run not found: ${runId}`);
534+
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
535+
runId,
536+
});
537+
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
509538
}
510539

511540
//4. Continue the run whether it's executing or not
512541
await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => {
513542
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
514543

515544
if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
516-
this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, {
545+
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
517546
runId,
518547
snapshot,
519548
});
@@ -555,6 +584,15 @@ export class WaitpointSystem {
555584

556585
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
557586

587+
this.$.logger.debug(
588+
`continueRunIfUnblocked: run was still executing, sending notification`,
589+
{
590+
runId,
591+
snapshot,
592+
newSnapshot,
593+
}
594+
);
595+
558596
await sendNotificationToWorker({
559597
runId,
560598
snapshot: newSnapshot,
@@ -563,7 +601,7 @@ export class WaitpointSystem {
563601
} else {
564602
// Because we cannot reacquire the concurrency, we need to enqueue the run again
565603
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
566-
await this.enqueueSystem.enqueueRun({
604+
const newSnapshot = await this.enqueueSystem.enqueueRun({
567605
run,
568606
env: run.runtimeEnvironment,
569607
snapshot: {
@@ -577,21 +615,27 @@ export class WaitpointSystem {
577615
index: b.batchIndex ?? undefined,
578616
})),
579617
});
618+
619+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
620+
runId,
621+
snapshot,
622+
newSnapshot,
623+
});
580624
}
581625
} else {
582626
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
583627
// TODO: We're screwed, should probably fail the run immediately
584-
this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, {
628+
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
585629
runId: run.id,
586630
snapshot,
587631
blockingWaitpoints,
588632
});
589-
throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
633+
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
590634
}
591635

592636
//put it back in the queue, with the original timestamp (w/ priority)
593637
//this prioritizes dequeuing waiting runs over new runs
594-
await this.enqueueSystem.enqueueRun({
638+
const newSnapshot = await this.enqueueSystem.enqueueRun({
595639
run,
596640
env: run.runtimeEnvironment,
597641
snapshot: {
@@ -604,6 +648,12 @@ export class WaitpointSystem {
604648
})),
605649
checkpointId: snapshot.checkpointId ?? undefined,
606650
});
651+
652+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
653+
runId,
654+
snapshot,
655+
newSnapshot,
656+
});
607657
}
608658
});
609659

@@ -613,6 +663,10 @@ export class WaitpointSystem {
613663
taskRunId: runId,
614664
},
615665
});
666+
667+
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
668+
runId,
669+
});
616670
}
617671

618672
public async createRunAssociatedWaitpoint(

0 commit comments

Comments
 (0)