Skip to content

[v4] Complete a waitpoint and then get affected runs #2034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 76 additions & 22 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,7 @@ export class WaitpointSystem {
isError: boolean;
};
}): Promise<Waitpoint> {
// 1. Find the TaskRuns blocked by this waitpoint
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
where: { waitpointId: id },
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
});

if (affectedTaskRuns.length === 0) {
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
waitpointId: id,
});
}

// 1. Complete the Waitpoint (if not completed)
let [waitpointError, waitpoint] = await tryCatch(
this.$.prisma.waitpoint.update({
where: { id, status: "PENDING" },
Expand Down Expand Up @@ -109,15 +98,44 @@ export class WaitpointSystem {
throw new Error(`Waitpoint ${id} not found`);
}

//schedule trying to continue the runs
if (waitpoint.status !== "COMPLETED") {
this.$.logger.error(`completeWaitpoint: waitpoint is not completed`, {
waitpointId: id,
});
throw new Error(`Waitpoint ${id} is not completed`);
}

// 2. Find the TaskRuns blocked by this waitpoint
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
where: { waitpointId: id },
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
});

if (affectedTaskRuns.length === 0) {
this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, {
waitpointId: id,
});
}

// 3. Schedule trying to continue the runs
for (const run of affectedTaskRuns) {
const jobId = `continueRunIfUnblocked:${run.taskRunId}`;
//50ms in the future
const availableAt = new Date(Date.now() + 50);

this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, {
waitpointId: id,
runId: run.taskRunId,
jobId,
availableAt,
});

await this.$.worker.enqueue({
//this will debounce the call
id: `continueRunIfUnblocked:${run.taskRunId}`,
id: jobId,
job: "continueRunIfUnblocked",
payload: { runId: run.taskRunId },
//50ms in the future
availableAt: new Date(Date.now() + 50),
availableAt,
});

// emit an event to complete associated cached runs
Expand Down Expand Up @@ -469,6 +487,10 @@ export class WaitpointSystem {
}

public async continueRunIfUnblocked({ runId }: { runId: string }) {
this.$.logger.debug(`continueRunIfUnblocked: start`, {
runId,
});

// 1. Get the any blocking waitpoints
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
where: { taskRunId: runId },
Expand All @@ -483,6 +505,10 @@ export class WaitpointSystem {

// 2. There are blockers still, so do nothing
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
runId,
blockingWaitpoints,
});
return;
}

Expand All @@ -505,15 +531,18 @@ export class WaitpointSystem {
});

if (!run) {
throw new Error(`#continueRunIfUnblocked: run not found: ${runId}`);
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
runId,
});
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
}

//4. Continue the run whether it's executing or not
await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => {
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, {
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
runId,
snapshot,
});
Expand Down Expand Up @@ -555,6 +584,15 @@ export class WaitpointSystem {

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
runId,
snapshot,
newSnapshot,
}
);

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
Expand All @@ -563,7 +601,7 @@ export class WaitpointSystem {
} else {
// Because we cannot reacquire the concurrency, we need to enqueue the run again
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
await this.enqueueSystem.enqueueRun({
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
Expand All @@ -577,21 +615,27 @@ export class WaitpointSystem {
index: b.batchIndex ?? undefined,
})),
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
runId,
snapshot,
newSnapshot,
});
}
} else {
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
// TODO: We're screwed, should probably fail the run immediately
this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, {
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
runId: run.id,
snapshot,
blockingWaitpoints,
});
throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
}

//put it back in the queue, with the original timestamp (w/ priority)
//this prioritizes dequeuing waiting runs over new runs
await this.enqueueSystem.enqueueRun({
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
Expand All @@ -604,6 +648,12 @@ export class WaitpointSystem {
})),
checkpointId: snapshot.checkpointId ?? undefined,
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
runId,
snapshot,
newSnapshot,
});
}
});

Expand All @@ -613,6 +663,10 @@ export class WaitpointSystem {
taskRunId: runId,
},
});

this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
runId,
});
}

public async createRunAssociatedWaitpoint(
Expand Down