Skip to content

Commit 39dd91b

Browse files
authored
fix(engine) prevent MVCC race in blockRunWithWaitpoint pending check (#3075)
Split the CTE in blockRunWithWaitpoint so the pending waitpoint check is a separate SQL statement. In READ COMMITTED isolation, each statement gets its own snapshot, so a separate SELECT sees the latest committed state from concurrent completeWaitpoint calls. Previously, the CTE did INSERT + pending check in one statement (one snapshot). If completeWaitpoint committed between the CTE start and the SELECT, the SELECT would still see PENDING due to the stale snapshot. Neither side would enqueue continueRunIfUnblocked, leaving the run stuck forever.
1 parent cf6b6e7 commit 39dd91b

File tree

2 files changed

+39
-6
lines changed

2 files changed

+39
-6
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix a race condition in the waitpoint system where a run could be blocked by a completed waitpoint but never be resumed because of a PostgreSQL MVCC issue. This was most likely to occur when creating a waitpoint via `wait.forToken()` at the same moment as completing the token with `wait.completeToken()`. Other types of waitpoints (timed, child runs) were not affected.

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,22 @@ export class WaitpointSystem {
366366

367367
/**
368368
* Prevents a run from continuing until the waitpoint is completed.
369+
*
370+
* This method uses two separate SQL statements intentionally:
371+
*
372+
* 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and
373+
* _WaitpointRunConnections rows (historical connections).
374+
*
375+
* 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING.
376+
*
377+
* These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation:
378+
* each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between
379+
* the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using
380+
* a separate SELECT, we get a fresh snapshot that reflects the latest committed state.
381+
*
382+
* The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted
383+
* by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT
384+
* DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking.
369385
*/
370386
async blockRunWithWaitpoint({
371387
runId,
@@ -399,8 +415,10 @@ export class WaitpointSystem {
399415
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
400416
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
401417

402-
//block the run with the waitpoints, returning how many waitpoints are pending
403-
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
418+
// Insert the blocking connections and the historical run connections.
419+
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
420+
// always executed regardless of whether they're referenced in the outer query.
421+
await prisma.$queryRaw`
404422
WITH inserted AS (
405423
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
406424
SELECT
@@ -425,12 +443,21 @@ export class WaitpointSystem {
425443
WHERE w.id IN (${Prisma.join($waitpoints)})
426444
ON CONFLICT DO NOTHING
427445
)
446+
SELECT COUNT(*) FROM inserted`;
447+
448+
// Check if the run is actually blocked using a separate query.
449+
// This MUST be a separate statement from the CTE above because in READ COMMITTED
450+
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
451+
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
452+
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
453+
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
428454
SELECT COUNT(*) as pending_count
429-
FROM inserted i
430-
JOIN "Waitpoint" w ON w.id = i."waitpointId"
431-
WHERE w.status = 'PENDING';`;
455+
FROM "Waitpoint"
456+
WHERE id IN (${Prisma.join($waitpoints)})
457+
AND status = 'PENDING'
458+
`;
432459

433-
const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
460+
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;
434461

435462
let newStatus: TaskRunExecutionStatus = "SUSPENDED";
436463
if (

0 commit comments

Comments
 (0)