-
-
Notifications
You must be signed in to change notification settings - Fork 725
release concurrency system reliability improvements #2081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…fill tokens for snapshots that are no longer the latest snapshot on a run (e.g. the run has moved to a new snapshot state)
|
WalkthroughThis update introduces a new "releasings" mechanism for concurrency token management in the run engine, allowing tokens to be tracked, validated, and conditionally refilled based on active releasers. The release concurrency system is refactored to support this, including changes to Redis Lua commands, queue management, and a new sweeper process for stale releasers. The run engine configuration and environment variables are extended to include releasings max age and poll interval parameters. The run engine heartbeat system adds support for a "SUSPENDED" state with corresponding handling. UI components and data loaders are updated to support two engine versions with distinct concurrency data presentations. Test coverage is expanded to validate the new concurrency token refill behavior. Changes
Sequence Diagram(s)Release Concurrency Token Refill (V2 Engine)sequenceDiagram
participant RunEngine
participant ReleaseConcurrencySystem
participant ReleaseConcurrencyTokenBucketQueue
participant Redis
participant ReleasingsSweeper
RunEngine->>ReleaseConcurrencySystem: refillTokensForSnapshot(snapshotId)
ReleaseConcurrencySystem->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfInReleasings(descriptor, releaserId)
ReleaseConcurrencyTokenBucketQueue->>Redis: Check releasings set for releaserId
alt releaserId in releasings
ReleaseConcurrencyTokenBucketQueue->>Redis: Remove releaserId from releasings, refill token
ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: true
else
ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: false
end
Note over ReleasingsSweeper: Periodically
ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: getReleasings(maxAge)
loop For each stale releaserId
ReleasingsSweeper->>ReleaseConcurrencySystem: validateSnapshotShouldRefillToken(releaserId)
alt shouldRefill
ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfInReleasings(...)
else
ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: removeReleaserIdFromReleasings(...)
end
end
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)
1-1303
: 💡 Verification agent🧩 Analysis chain
Verify comprehensive system integration
The changes look good overall and address the reliability improvements mentioned in the PR objectives. The code integrates well with the broader concurrency system enhancements.
🏁 Script executed:
#!/bin/bash # Check if the ReleaseConcurrencySystem is properly initialized in the RunEngine rg -A 15 "new RunEngine" -g "*.ts" | grep -A 20 "releaseConcurrencySystem" || echo "ReleaseConcurrencySystem initialization not found in RunEngine" # Verify that other related systems also use the refillTokensForSnapshot method rg "refillTokensForSnapshot" -g "*.ts" | grep -v "runAttemptSystem\|checkpointSystem" || echo "No other systems found using refillTokensForSnapshot"Length of output: 1035
Add ReleaseConcurrencySystem to RunEngine initialization
The
RunAttemptSystem
now depends onreleaseConcurrencySystem
, but I couldn’t find it being passed in when theRunEngine
is constructed. Without this, calls torefillTokensForSnapshot
will fail at runtime. Please update theRunEngine
setup (e.g. ininternal-packages/run-engine/src/engine/index.ts
or wherever you instantiate all subsystems) to include:import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js"; const releaseConcurrencySystem = new ReleaseConcurrencySystem(/* dependencies */); // … const runAttemptSystem = new RunAttemptSystem({ resources, executionSnapshotSystem, batchSystem, waitpointSystem, delayedRunSystem, releaseConcurrencySystem, // ← add this retryWarmStartThresholdMs, machines, });Also ensure that the same
releaseConcurrencySystem
instance is passed into any other subsystems (e.g.WaitpointSystem
,DequeueSystem
) that invokerefillTokensForSnapshot
.🧰 Tools
🪛 Biome (1.9.4)
[error] 350-350: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
♻️ Duplicate comments (1)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)
1051-1064
: Constructor parameter order causes silent logic errorsThe constructor defines
(queue, validateReleaserId, pollInterval, maxAge, …)
.
Because the call site passesmaxAge
thenpollInterval
, the class fieldsthis.pollInterval
andthis.maxAge
hold the wrong values (see above).
If you adopt the fix in the previous comment you won’t need to change the constructor, but if you prefer the current call-site order, swap the parameters here instead.
🧹 Nitpick comments (6)
apps/webapp/app/components/admin/debugRun.tsx (1)
350-400
: Added V2 engine debug view with simplified concurrency information.The new V2 engine view provides a cleaner interface focused on the most important concurrency information. However, there's a minor issue with the React key property.
Add a unique key to the mapped elements to avoid React warnings:
- {keys.map((key) => ( - <Property.Item> + {keys.map((key, index) => ( + <Property.Item key={key.label || `key-${index}`}>🧰 Tools
🪛 Biome (1.9.4)
[error] 391-391: Missing key property for this element in iterable.
The order of the items may change, and having a key can help React identify which item was moved.
Check the React documentation.(lint/correctness/useJsxKeyInIterable)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (3)
389-392
: LoweringmaxDelay
to 200 ms tightens the timing window – tests may turn flaky.On slower CI runners (or when the host is under load) the back-off loop could legitimately exceed 200 ms, failing the assertion.
Consider adding a slightly larger safety margin or deriving the expected delay from the configuredfactor
instead of a hard constant.
717-734
: Race-condition risk – 100 ms may be insufficient for the async consumer loop.The new
refillTokenIfInReleasings
tests rely onawait setTimeout(100)
to wait for the queue consumer to pick up the token.
A busy host or GC pause could postpone the consumer and make the test non-deterministic.Recommend using a polling helper such as
waitFor(() => executedRuns.length === 2)
with a generous timeout instead of sleeping a fixed 100 ms.
803-818
: Edge-case: a second refill on a differentreleaserId
is expected to fail, but the test does not assert token count beforehand.If the queue implementation changes its semantics the test might still pass (false returned) while silently over-filling the bucket.
Add an explicit assertion before the second refill to capture this unwanted state, e.g.:expect((await queue.getReleaseQueueMetrics({ name: "test-queue" })).currentTokens).toBe(1);internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
77-115
:validateSnapshotShouldRefillToken
performs two separate DB calls – combine for performance.
getLatestExecutionSnapshot
triggers a second query that could be merged into the first viaorderBy
+take(1)
, cutting round-trip latency by 50 %.
Worth considering if this validator runs frequently (e.g. sweeper loop).internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)
456-458
: Global"releasings"
key risks collisions; consider namespacing
#releasingsKey()
returns a literal"releasings"
which, when combined with RediskeyPrefix
, is shared by every queue handled by this client.
If multiple logical environments (e.g., prod vs. staging) share the same Redis cluster but use different prefixes, you’re safe; however, if distinct token-bucket queues are created by code that sets another prefix or none at all, collisions will occur.- #releasingsKey() { - return "releasings"; - } + #releasingsKey() { + return `${this.keyPrefix}releasings`; + }(or append any identifier unique to the queue/system).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (12)
apps/webapp/app/components/admin/debugRun.tsx
(2 hunks)apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts
(4 hunks)apps/webapp/app/v3/runEngine.server.ts
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(2 hunks)internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(20 hunks)internal-packages/run-engine/src/engine/systems/checkpointSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
(5 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(3 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts
(6 hunks)internal-packages/run-engine/src/engine/types.ts
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1)
WaitpointSystem
(26-703)internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
ReleaseConcurrencySystem
(32-348)
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (3)
apps/webapp/app/v3/marqs/index.server.ts (2)
marqs
(2152-2152)envConcurrencyLimitKey
(1531-1542)apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)apps/webapp/app/v3/marqs/marqsKeyProducer.ts (4)
queueCurrentConcurrencyKey
(128-136)envCurrentConcurrencyKey
(162-167)queueConcurrencyLimitKey
(40-42)envConcurrencyLimitKey
(46-51)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
redisTest
(158-158)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (3)
ReleaseConcurrencyQueueOptions
(23-47)ReleaseConcurrencyTokenBucketQueue
(56-944)options
(478-490)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)
getLatestExecutionSnapshot
(95-113)
🪛 Biome (1.9.4)
apps/webapp/app/components/admin/debugRun.tsx
[error] 391-391: Missing key property for this element in iterable.
The order of the items may change, and having a key can help React identify which item was moved.
Check the React documentation.
(lint/correctness/useJsxKeyInIterable)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (16)
internal-packages/run-engine/src/engine/types.ts (1)
50-51
: Configuration properties added for releasings managementThese new optional properties will control the lifespan and polling frequency of the "releasings" mechanism, which is used to track active releasers holding concurrency tokens. This supports the improved token refill reliability mentioned in the PR objectives.
apps/webapp/app/v3/runEngine.server.ts (1)
84-85
: Added configuration for releasings managementThese new properties connect the environment variables to the engine configuration, enabling the new release concurrency reliability features.
internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (2)
198-205
: Improved token refill logic with debug loggingThe method has been updated to use the new
refillTokensForSnapshot
API which provides more reliable token refill based on snapshot IDs rather than environments. The added debug logging will help with troubleshooting concurrency issues.
236-242
: Consistent token refill pattern with debug loggingSimilar to the previous change, this updates the token refill logic to use snapshot IDs and adds debugging information, creating a consistent pattern throughout the codebase.
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (5)
36-36
: Added import for ReleaseConcurrencySystemThis import enables integration with the release concurrency system.
44-44
: Added ReleaseConcurrencySystem to system optionsAllowing the release concurrency system to be injected as a dependency.
55-55
: Added private member for ReleaseConcurrencySystemThis stores the reference to the injected system.
63-63
: Initialized ReleaseConcurrencySystem from optionsStandard initialization pattern consistent with other systems in this class.
1044-1045
: Key improvement: Refill tokens on run cancellationThis addresses a critical part of the PR objectives - refilling tokens when a run is cancelled. The system now explicitly refills tokens from the snapshot when acknowledging messages during cancellation, which prevents token leakage in the EXECUTING_WITH_WAITPOINTS state.
This change, combined with the new releasings tracking mechanism, ensures that concurrency tokens are properly managed throughout the run lifecycle, improving system reliability.
apps/webapp/app/env.server.ts (1)
608-612
: Environment variables added for release concurrency releasings system.Two new env variables have been added to support the release concurrency mechanism:
RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE
: Controls how long a run can hold a concurrency token (30min default)RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL
: Controls how often the system checks for expired tokens (1min default)These parameters will allow tuning the behavior of the token cleanup mechanism.
internal-packages/run-engine/src/engine/index.ts (2)
204-206
: Enhanced ReleaseConcurrencySystem with new configuration options.The new parameters facilitate the token sweeping mechanism, allowing for better control over how long tokens can be held and how frequently they're validated.
285-285
: Injected releaseConcurrencySystem into RunAttemptSystem.This enables the run attempt system to directly handle token refilling when runs get canceled while in EXECUTING_WITH_WAITPOINTS state.
internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1)
1508-1687
: Comprehensive test for the new token sweeper system.This test verifies that the release concurrency sweeper system correctly:
- Identifies runs that have held tokens for too long (beyond releasingsMaxAge)
- Refills tokens when a run transitions to PENDING_CANCEL state
- Ensures tokens are properly tracked and validated
Nice job adding comprehensive test coverage for this important reliability improvement.
apps/webapp/app/components/admin/debugRun.tsx (1)
69-75
: Added engine version based conditional rendering.Good approach to supporting both V1 and V2 engine versions with appropriate UI for each.
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (1)
104-109
:✅ Verification successful
Possible double-prefix of Redis keys – verify with
engine.runQueue.keys.*
.
withPrefix()
unconditionally addsengine:runqueue:
to every key returned byengine.runQueue.keys.*
.
If those helper functions already include the same prefix, the resulting key will beengine:runqueue:engine:runqueue:...
, causing the UI to point at the wrong hash/set.Requesting a quick check (or run the script below) will avoid silent debugging headaches.
🏁 Script executed:
#!/bin/bash # Grep for a hard-coded "engine:runqueue:" in key helpers. fd --extension ts runEngine | xargs rg --line-number 'engine:runqueue:'Length of output: 71
🏁 Script executed:
#!/bin/bash set -e # Locate all TS files referencing "runQueue" files=$(rg -l "runQueue" --glob "*.ts") if [[ -z "$files" ]]; then echo "No files found containing 'runQueue'" exit 0 fi echo "Found runQueue definitions in:" echo "$files" echo # For each file, show the "keys" block (or related usage) with context for file in $files; do echo "---- $file ----" rg -n -C3 "keys\s*=" "$file" || echo " (no direct 'keys =' in this file)" echo doneLength of output: 20646
🏁 Script executed:
#!/bin/bash set -e # Locate implementation of RunQueueFullKeyProducer echo "Searching for RunQueueFullKeyProducer class definition..." rg -n "class RunQueueFullKeyProducer" -R internal-packages/run-engine/src/engine # Locate currentConcurrencyKey method implementation echo echo "Searching for currentConcurrencyKey method..." rg -n "currentConcurrencyKey" -R internal-packages/run-engine/src/engine # Show surrounding lines for context (±3) if found if rg -q "currentConcurrencyKey" -R internal-packages/run-engine/src/engine; then rg -n -C3 "currentConcurrencyKey" -R internal-packages/run-engine/src/engine else echo "No currentConcurrencyKey implementation found." fiLength of output: 621
🏁 Script executed:
#!/bin/bash set -e # Locate the RunQueueFullKeyProducer class and its file echo "=== RunQueueFullKeyProducer definition ===" rg -n -C3 "class RunQueueFullKeyProducer" internal-packages/run-engine/src/engine echo echo "=== currentConcurrencyKey implementation ===" rg -n -C3 "currentConcurrencyKey" internal-packages/run-engine/src/engineLength of output: 187
🏁 Script executed:
#!/bin/bash set -e # Search for RunQueueFullKeyProducer class definition across the entire repository echo "=== Searching for RunQueueFullKeyProducer ===" rg -n "class RunQueueFullKeyProducer" . # Search for currentConcurrencyKey implementation across the entire repository echo echo "=== Searching for currentConcurrencyKey ===" rg -n "currentConcurrencyKey" .Length of output: 3808
🏁 Script executed:
#!/bin/bash set -e # Display the RunQueueFullKeyProducer class definition with context rg -n -C5 "class RunQueueFullKeyProducer" internal-packages/run-engine/src/run-queue/keyProducer.ts # Display the implementation of currentConcurrencyKey and currentConcurrencyKeyFromQueue rg -n -C5 "currentConcurrencyKey" internal-packages/run-engine/src/run-queue/keyProducer.tsLength of output: 1210
🏁 Script executed:
#!/bin/bash set -e # Display the implementation of queueKey within RunQueueFullKeyProducer rg -n -C5 "queueKey" internal-packages/run-engine/src/run-queue/keyProducer.tsLength of output: 1527
No double-prefix detected in Redis keys
I’ve verified that none of the
RunQueueFullKeyProducer
methods emit the hard-coded"engine:runqueue:"
prefix. Ininternal-packages/run-engine/src/run-queue/keyProducer.ts
:
queueKey(...)
builds a colon-separated key (org, project, env, queue, concurrencyKey) with no global prefix.currentConcurrencyKeyFromQueue(queue)
appends only:currentConcurrency
.currentConcurrencyKey(env, queue, concurrencyKey)
likewise joins the barequeueKey(...)
with:current_concurrency
.Therefore, the
withPrefix()
wrapper in
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts
will prepend"engine:runqueue:"
exactly once. No change needed.internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)
217-225
: Return value ofreturnTokenOnly
is ignored – lost-token riskThe Lua script short-circuits with
"false"
when thereleaserId
isn’t in thereleasings
set (lines 873-877).
returnToken()
discards this boolean, so callers believe the token was returned even when nothing happened, potentially leaking concurrency slots.Minimal fix:
- await this.redis.returnTokenOnly( + const succeeded = await this.redis.returnTokenOnly( this.masterQueuesKey, this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), this.#releasingsKey(), releaseQueue, releaserId ); + + if (succeeded !== "true") { + this.logger.warn("Attempted to return a token that was not tracked in releasings", { + releaseQueueDescriptor, + releaserId, + }); + }[ suggest_essential_refactor ]
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
Show resolved
Hide resolved
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
Show resolved
Hide resolved
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (2)
1081-1101
: Consider improving error handling in sweep method.The sweep method processes each releaser ID sequentially. Consider adding try/catch blocks around the validation and token refill operations to ensure that an error processing one releaser doesn't prevent the processing of others.
private async sweep() { const releasings = await this.queue.getReleasings(this.maxAge); this.logger.debug("Sweeping releasings:", { releasings }); for (const releaserId of releasings) { + try { const result = await this.validateReleaserId(releaserId); this.logger.debug("Validated releaserId:", { releaserId, result }); if (!result) { // We need to remove the releaserId from the releasings set await this.queue.removeReleaserIdFromReleasings(releaserId); continue; } if (result.shouldRefill) { await this.queue.refillTokenIfInReleasings(result.releaseQueue, result.releaserId); } + } catch (error) { + this.logger.error("Error processing releaser in sweep:", { releaserId, error }); + } } }
76-76
: Default logger level set to "debug".The default logger level is now set to "debug" which provides detailed logging of the concurrency system behavior. This is helpful for diagnosing issues but might generate substantial log volume in production. Consider making this configurable based on environment.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(20 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (5)
478-490
: The constructor arguments are now correctly ordered.The implementation of
#startReleasingsSweeper()
now correctly passes arguments to theReleaseConcurrencyReleasingsSweeper
constructor withpollInterval
as the third argument andmaxAge
as the fourth argument, fixing the issue identified in a previous review.
17-21
: Well-designed releasings tracking mechanism.The addition of the
ReleaseConcurrencyValidatorResult
type, the new validation-related options, and the helper methods for working with the releasings set create a robust framework for tracking token holders and validating their status over time.Also applies to: 29-33, 456-458, 655-666
1048-1079
: Well-implemented sweeper with proper error handling.The
ReleaseConcurrencyReleasingsSweeper
implementation includes robust error handling for the overall sweeping process, with proper AbortError handling and continuous operation despite errors in individual sweep operations.
292-330
: Effective token refill mechanism using releasings set.The
refillTokenIfInReleasings
method and its corresponding Lua script effectively implement the logic to conditionally refill tokens only when the releaser ID is present in the releasings set, which is a key part of the reliability improvements for the release concurrency system.Also applies to: 899-942
861-896
: Improved token return mechanism with validation.The updated
returnTokenOnly
Lua script now properly checks if the releaser ID exists in the releasings set before returning a token, preventing spurious token returns and ensuring accurate token counts.
…mpt to continue the run if unblocked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (2)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)
553-561
:⚠️ Potential issueReturn value “skipped” is lost – outer function always resolves to
"unblocked"
"skipped"
is returned inside therunLock.lock
callback but the outer function ignores that value.
Callers will therefore mis-interpret a finished run as"unblocked"
.- await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => { + const lockResult = await this.$.runLock.lock( + "continueRunIfUnblocked", + [runId], + 5000, + async () => { ... - return "skipped"; + return "skipped"; } - ); + );and after the lock finishes:
- return "unblocked"; + return (lockResult as "blocked" | "unblocked" | "skipped") ?? "unblocked";This preserves the three-state contract.
596-598
: 🛠️ Refactor suggestionRefill should use
newSnapshot
, not stalesnapshot
releaseConcurrencySystem.refillTokensForSnapshot(snapshot)
is executed afternewSnapshot
is created, yet passes the old snapshot.
If token calculation relies on the latest execution status or metadata, this will act on outdated data.- await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot); + await this.releaseConcurrencySystem.refillTokensForSnapshot(newSnapshot);
🧹 Nitpick comments (2)
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1)
485-487
: Test timeout could be shortened to keep CI fastThe SUSPENDED test waits
1 000 ms * 3.5 ≈ 3.5 s
.
With multiple suites this inflates pipeline time. Using a 100–200 ms heartbeat and proportionally smaller waits keeps semantics identical while shaving seconds off every run.
Example:- const heartbeatTimeout = 1000; + const heartbeatTimeout = 150; ... - await setTimeout(heartbeatTimeout * 1.5); // 225 ms ... - await setTimeout(heartbeatTimeout * 2); // 300 msinternal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1)
671-686
: Early"blocked"
return skips cleanupWhen blocking wait-points still exist the function returns
"blocked"
before it reaches the section that deletesTaskRunWaitpoint
rows. That’s fine, but for consistency the deletion should also be skipped for"skipped"
cases—otherwise finished runs will still incur an extra DELETE.Consider wrapping the delete in:
if (status === "unblocked") { await prisma.taskRunWaitpoint.deleteMany(...); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (7)
apps/webapp/app/env.server.ts
(2 hunks)apps/webapp/app/v3/runEngine.server.ts
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(4 hunks)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
(4 hunks)internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
(1 hunks)internal-packages/run-engine/src/engine/types.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- apps/webapp/app/v3/runEngine.server.ts
- apps/webapp/app/env.server.ts
- internal-packages/run-engine/src/engine/types.ts
- internal-packages/run-engine/src/engine/index.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (3)
367-375
: More precise worker-id check looks goodThe added guard (
latestSnapshot.workerId && …
) avoids noisy debug logs whenworkerId
isundefined
.
Nice low-noise improvement.
443-445
: New SUSPENDED heartbeat slot wired correctly
#getHeartbeatIntervalMs
now returnsheartbeatTimeouts.SUSPENDED
.
Make sureHeartbeatTimeouts
type and configuration always include this field to preventundefined
intervals in prod configs.
411-424
:❓ Verification inconclusive
restartHeartbeatForRun
can throw on duplicate job IDs
worker.enqueue()
is called with the hard-coded IDheartbeatSnapshot.${runId}
.
If a heartbeat job with the same ID is still present (e.g. the previous one hasn’t fired yet) most queue implementations will either:
- Reject the insert with a unique-constraint error, or
- Silently ignore it, leaving the old schedule in place.
Either outcome defeats the intent of “restart”.
Prefer rescheduling (or deleting + enqueueing) to guarantee exactly one future heartbeat:- await this.$.worker.enqueue({ + await this.$.worker.reschedule( + `heartbeatSnapshot.${runId}`, + new Date(Date.now() + intervalMs), + { + job: "heartbeatSnapshot", + payload: { snapshotId: latestSnapshot.id, runId }, + } + );(or use whatever atomic “upsert” your queue lib exposes).
🏁 Script executed:
#!/usr/bin/env bash set -e # Locate the ExecutionSnapshotSystem source file file=$(rg -l "class ExecutionSnapshotSystem") echo "ExecutionSnapshotSystem file: $file" # Print imports, constructor, and dependency definitions for context sed -n '1,200p' "$file"Length of output: 6631
🏁 Script executed:
#!/usr/bin/env bash set -e # Find and display the SystemResources type definition sys_file=$(rg -l "export type SystemResources" -g "*.ts") echo "SystemResources defined in: $sys_file" sed -n '1,200p' "$sys_file" # Locate the ExecutionSnapshotSystem class constructor to see how 'worker' is defined exec_file="internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts" echo "Constructor context in ExecutionSnapshotSystem:" rg -n "constructor" -A5 -B5 "$exec_file"Length of output: 1698
🏁 Script executed:
#!/usr/bin/env bash set -e # Locate the EngineWorker interface/type engine_worker_file=$(rg -l "EngineWorker" -g "internal-packages/run-engine/src/engine/types.*") echo "EngineWorker defined in: $engine_worker_file" sed -n '1,200p' "$engine_worker_file" # Display the full ExecutionSnapshotSystem class around restartHeartbeatForRun exec_file="internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts" rg -n "restartHeartbeatForRun" -A10 -B10 "$exec_file"Length of output: 5273
Use atomic reschedule to avoid duplicate heartbeat jobs
In
restartHeartbeatForRun
(internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts lines 411–422), callingawait this.$.worker.enqueue({ id: `heartbeatSnapshot.${runId}`, … });can either throw on duplicate IDs or silently no-op, defeating “restart.” Prefer an upsert-style reschedule:
- await this.$.worker.enqueue({ - id: `heartbeatSnapshot.${runId}`, - job: "heartbeatSnapshot", - payload: { snapshotId: latestSnapshot.id, runId }, - availableAt: new Date(Date.now() + intervalMs), - }); + await this.$.worker.reschedule( + `heartbeatSnapshot.${runId}`, + new Date(Date.now() + intervalMs) + );If your Worker API requires passing job & payload on reschedule, include them as a third parameter:
- await this.$.worker.enqueue({ … }); + await this.$.worker.reschedule( + `heartbeatSnapshot.${runId}`, + new Date(Date.now() + intervalMs), + { + job: "heartbeatSnapshot", + payload: { snapshotId: latestSnapshot.id, runId }, + } + );Finally, confirm that
reschedule
will create the job if it doesn’t exist; otherwise consider deleting the old entry + enqueue.
Uh oh!
There was an error while loading. Please reload this page.