-
-
Notifications
You must be signed in to change notification settings - Fork 724
[v4] Complete a waitpoint and then get affected runs #2034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThe Changes
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (5)
69-82
: Consider wrapping the update + query in a single transaction for true atomicity
completeWaitpoint
first marks the waitpoint asCOMPLETED
and then performs a separate query to collect affectedTaskRunWaitpoint
rows.
In the (admittedly tiny) time-window between these two statements a concurrentblockRunWithWaitpoint
call could still insert additionalTaskRunWaitpoint
rows that would be missed by the follow-up query, leaving a run blocked without a scheduledcontinueRunIfUnblocked
job.Moving both statements into one
$.prisma.$transaction()
(or using a CTE) guarantees a consistent snapshot and eliminates the race completely.-// 1. Complete the Waitpoint … -let [waitpointError, waitpoint] = await tryCatch(this.$.prisma.waitpoint.update(…)); -… -// 2. Find the TaskRuns blocked by this waitpoint -const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany(…); +const [waitpointResult, affectedTaskRuns] = await this.$.prisma.$transaction([ + this.$.prisma.waitpoint.update({ … }), + this.$.prisma.taskRunWaitpoint.findMany({ … }), +]);This keeps the public behaviour unchanged while removing the edge-case window.
101-106
: Include the actual status in the thrown error for easier debuggingRight now the error says “Waitpoint X is not completed” but omits the current status, forcing operators to re-query the DB.
A tiny tweak makes incidents faster to diagnose:-throw new Error(`Waitpoint ${id} is not completed`); +throw new Error( + `Waitpoint ${id} is not completed (current status: ${waitpoint.status})` +);
120-133
: Enqueue calls can be batched to reduce I/O latencyInside the loop we await each
worker.enqueue
, turning the process into N sequential round-trips.
If hundreds of runs are affected this can add noticeable latency. The worker already de-duplicates byid
, so we can safely fire them in parallel:-for (const run of affectedTaskRuns) { - … - await this.$.worker.enqueue({ … }); -} +await Promise.all( + affectedTaskRuns.map((run) => + this.$.worker.enqueue({ + id: `continueRunIfUnblocked:${run.taskRunId}`, + job: "continueRunIfUnblocked", + payload: { runId: run.taskRunId }, + availableAt: new Date(Date.now() + 50), + }) + ) +);Throughput improves and overall wall-clock time drops, especially under load.
587-595
: Logging full snapshot objects may bloat logs and expose PII
this.$.logger.debug
serialises potentially largesnapshot
/newSnapshot
objects.
Consider logging only stable identifiers (id
,executionStatus
, etc.) or using a structured logger that redacts bulky/PII fields. This keeps log volume manageable and avoids accidental leakage.
629-634
: TODO: provide a concrete fallback when checkpoint is missingThe TODO notes “We're screwed” and throws an error, which will bubble up and retry the job indefinitely unless caught elsewhere. If this situation is expected to be unrecoverable, consider:
- Explicitly failing the run in the DB with an explanatory status.
- Surfacing a clear alert/metric so the condition doesn’t go unnoticed.
- Adding unit / integration tests to cover this edge case.
Happy to help draft the failure-handling code if useful.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
When we block a run with a waitpoint we only block the run if the waitpoint is
PENDING
. If it's already completed then we don't do anything.Before this PR, when we completed a waitpoint we were getting the blocked runs before marking the waitpoint as
COMPLETED
. This meant it was possible to get an outdated list of blocked runs if the timing was slightly wrong.This change makes it so we only get the blocked runs after the Waitpoint is
COMPLETED
(at the point where it's not possible to block a run with it anymore).Also added more logging so we can find what's happening more easily.
Summary by CodeRabbit