Skip to content

release concurrency system reliability improvements #2081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 21, 2025
Merged

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented May 20, 2025

  • Make refilling tokens to the release concurrency token bucket more reliable by accounting for specific snapshots releasing concurrency, with an additional sweeper process that runs periodically to cleanup stale snapshots and refill token buckets that are incorrectly depleted
  • Refill tokens when a run is cancelled while EXECUTING_WITH_WAITPOINTS

ericallam added 2 commits May 20, 2025 12:30
…fill tokens for snapshots that are no longer the latest snapshot on a run (e.g. the run has moved to a new snapshot state)
Copy link

changeset-bot bot commented May 20, 2025

⚠️ No Changeset found

Latest commit: 9754bed

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented May 20, 2025

Walkthrough

This update introduces a new "releasings" mechanism for concurrency token management in the run engine, allowing tokens to be tracked, validated, and conditionally refilled based on active releasers. The release concurrency system is refactored to support this, including changes to Redis Lua commands, queue management, and a new sweeper process for stale releasers. The run engine configuration and environment variables are extended to include releasings max age and poll interval parameters. The run engine heartbeat system adds support for a "SUSPENDED" state with corresponding handling. UI components and data loaders are updated to support two engine versions with distinct concurrency data presentations. Test coverage is expanded to validate the new concurrency token refill behavior.

Changes

File(s) Change Summary
apps/webapp/app/components/admin/debugRun.tsx Refactored DebugRunData to accept a single props object; split rendering between DebugRunDataEngineV1 and new DebugRunDataEngineV2 for different engine versions.
apps/webapp/app/env.server.ts Added environment variables RUN_ENGINE_TIMEOUT_SUSPENDED, RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE, and RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL to the environment schema.
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts Loader function updated to branch logic for engine V1/V2, fetches concurrency data and keys accordingly, and returns structured data for UI.
apps/webapp/app/v3/runEngine.server.ts Added heartbeat timeout for suspended state and releasings max age and poll interval to run engine release concurrency configuration.
internal-packages/run-engine/src/engine/index.ts Modified RunEngine constructor to add heartbeat timeout for suspended state; updated ReleaseConcurrencySystem initialization to include new options, removed legacy executor/keys/maxTokens, and passed releaseConcurrencySystem to RunAttemptSystem; implemented handling for suspended runs in stalled snapshot handler.
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts Introduced "releasings" Redis sorted set; extended Lua commands to manage releasings; added sweeper class for stale releasers with validation and token refills; added methods to manage releasings; enhanced logging and configuration options.
internal-packages/run-engine/src/engine/systems/checkpointSystem.ts Refactored to call refillTokensForSnapshot with snapshot IDs instead of environment; added debug logging; removed previous checkpointCreatedOnEnvironment usage.
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts Extended options; implemented new methods for validation and token refilling; removed checkpointCreatedOnEnvironment; updated queue initialization with internal executor, keys, maxTokens, and validation logic.
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts Injected ReleaseConcurrencySystem; added token refill on run cancellation acknowledgment.
internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts Added test validating token bucket refill after new snapshot created by the release concurrency sweeper system.
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts Updated tests to use new refillTokenIfInReleasings method and semantics; adjusted expectations and test flows accordingly.
internal-packages/run-engine/src/engine/types.ts Added releasingsMaxAge and releasingsPollInterval to RunEngineOptions.releaseConcurrency; added SUSPENDED to HeartbeatTimeouts.
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts Added restartHeartbeatForRun method; extended heartbeat interval logic to handle "SUSPENDED" status; refined debug logging condition.
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts Updated continueRunIfUnblocked method to return explicit status strings "blocked", "unblocked", or "skipped" indicating run state after continuation attempt.
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts Added test case for handling of suspended runs, verifying heartbeat timeouts and state transitions involving waitpoints and suspended execution status.

Sequence Diagram(s)

Release Concurrency Token Refill (V2 Engine)

sequenceDiagram
    participant RunEngine
    participant ReleaseConcurrencySystem
    participant ReleaseConcurrencyTokenBucketQueue
    participant Redis
    participant ReleasingsSweeper

    RunEngine->>ReleaseConcurrencySystem: refillTokensForSnapshot(snapshotId)
    ReleaseConcurrencySystem->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfInReleasings(descriptor, releaserId)
    ReleaseConcurrencyTokenBucketQueue->>Redis: Check releasings set for releaserId
    alt releaserId in releasings
        ReleaseConcurrencyTokenBucketQueue->>Redis: Remove releaserId from releasings, refill token
        ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: true
    else
        ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: false
    end

    Note over ReleasingsSweeper: Periodically
    ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: getReleasings(maxAge)
    loop For each stale releaserId
        ReleasingsSweeper->>ReleaseConcurrencySystem: validateSnapshotShouldRefillToken(releaserId)
        alt shouldRefill
            ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfInReleasings(...)
        else
            ReleasingsSweeper->>ReleaseConcurrencyTokenBucketQueue: removeReleaserIdFromReleasings(...)
        end
    end
Loading

Poem

🐇
In the warren of code, new keys abound,
Releasers tracked, their tokens found.
A sweeper hops through Redis night,
Refilling buckets, setting things right.
With engines old and engines new,
This bunny cheers—concurrency grew!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)

1-1303: 💡 Verification agent

🧩 Analysis chain

Verify comprehensive system integration

The changes look good overall and address the reliability improvements mentioned in the PR objectives. The code integrates well with the broader concurrency system enhancements.


🏁 Script executed:

#!/bin/bash
# Check if the ReleaseConcurrencySystem is properly initialized in the RunEngine
rg -A 15 "new RunEngine" -g "*.ts" | grep -A 20 "releaseConcurrencySystem" || echo "ReleaseConcurrencySystem initialization not found in RunEngine"

# Verify that other related systems also use the refillTokensForSnapshot method
rg "refillTokensForSnapshot" -g "*.ts" | grep -v "runAttemptSystem\|checkpointSystem" || echo "No other systems found using refillTokensForSnapshot"

Length of output: 1035


Add ReleaseConcurrencySystem to RunEngine initialization

The RunAttemptSystem now depends on releaseConcurrencySystem, but I couldn’t find it being passed in when the RunEngine is constructed. Without this, calls to refillTokensForSnapshot will fail at runtime. Please update the RunEngine setup (e.g. in internal-packages/run-engine/src/engine/index.ts or wherever you instantiate all subsystems) to include:

import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";

const releaseConcurrencySystem = new ReleaseConcurrencySystem(/* dependencies */);
// …
const runAttemptSystem = new RunAttemptSystem({
  resources,
  executionSnapshotSystem,
  batchSystem,
  waitpointSystem,
  delayedRunSystem,
  releaseConcurrencySystem,   // ← add this
  retryWarmStartThresholdMs,
  machines,
});

Also ensure that the same releaseConcurrencySystem instance is passed into any other subsystems (e.g. WaitpointSystem, DequeueSystem) that invoke refillTokensForSnapshot.

🧰 Tools
🪛 Biome (1.9.4)

[error] 350-350: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)

♻️ Duplicate comments (1)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)

1051-1064: Constructor parameter order causes silent logic errors

The constructor defines (queue, validateReleaserId, pollInterval, maxAge, …).
Because the call site passes maxAge then pollInterval, the class fields this.pollInterval and this.maxAge hold the wrong values (see above).
If you adopt the fix in the previous comment you won’t need to change the constructor, but if you prefer the current call-site order, swap the parameters here instead.

🧹 Nitpick comments (6)
apps/webapp/app/components/admin/debugRun.tsx (1)

350-400: Added V2 engine debug view with simplified concurrency information.

The new V2 engine view provides a cleaner interface focused on the most important concurrency information. However, there's a minor issue with the React key property.

Add a unique key to the mapped elements to avoid React warnings:

-      {keys.map((key) => (
-        <Property.Item>
+      {keys.map((key, index) => (
+        <Property.Item key={key.label || `key-${index}`}>
🧰 Tools
🪛 Biome (1.9.4)

[error] 391-391: Missing key property for this element in iterable.

The order of the items may change, and having a key can help React identify which item was moved.
Check the React documentation.

(lint/correctness/useJsxKeyInIterable)

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (3)

389-392: Lowering maxDelay to 200 ms tightens the timing window – tests may turn flaky.

On slower CI runners (or when the host is under load) the back-off loop could legitimately exceed 200 ms, failing the assertion.
Consider adding a slightly larger safety margin or deriving the expected delay from the configured factor instead of a hard constant.


717-734: Race-condition risk – 100 ms may be insufficient for the async consumer loop.

The new refillTokenIfInReleasings tests rely on await setTimeout(100) to wait for the queue consumer to pick up the token.
A busy host or GC pause could postpone the consumer and make the test non-deterministic.

Recommend using a polling helper such as waitFor(() => executedRuns.length === 2) with a generous timeout instead of sleeping a fixed 100 ms.


803-818: Edge-case: a second refill on a different releaserId is expected to fail, but the test does not assert token count beforehand.

If the queue implementation changes its semantics the test might still pass (false returned) while silently over-filling the bucket.
Add an explicit assertion before the second refill to capture this unwanted state, e.g.:

expect((await queue.getReleaseQueueMetrics({ name: "test-queue" })).currentTokens).toBe(1);
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)

77-115: validateSnapshotShouldRefillToken performs two separate DB calls – combine for performance.

getLatestExecutionSnapshot triggers a second query that could be merged into the first via orderBy + take(1), cutting round-trip latency by 50 %.
Worth considering if this validator runs frequently (e.g. sweeper loop).

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)

456-458: Global "releasings" key risks collisions; consider namespacing

#releasingsKey() returns a literal "releasings" which, when combined with Redis keyPrefix, is shared by every queue handled by this client.
If multiple logical environments (e.g., prod vs. staging) share the same Redis cluster but use different prefixes, you’re safe; however, if distinct token-bucket queues are created by code that sets another prefix or none at all, collisions will occur.

-  #releasingsKey() {
-    return "releasings";
-  }
+  #releasingsKey() {
+    return `${this.keyPrefix}releasings`;
+  }

(or append any identifier unique to the queue/system).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between e08e493 and 2bd533d.

📒 Files selected for processing (12)
  • apps/webapp/app/components/admin/debugRun.tsx (2 hunks)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (4 hunks)
  • apps/webapp/app/v3/runEngine.server.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (2 hunks)
  • internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (20 hunks)
  • internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (5 hunks)
  • internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (3 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (6 hunks)
  • internal-packages/run-engine/src/engine/types.ts (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1)
  • WaitpointSystem (26-703)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
  • ReleaseConcurrencySystem (32-348)
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (3)
apps/webapp/app/v3/marqs/index.server.ts (2)
  • marqs (2152-2152)
  • envConcurrencyLimitKey (1531-1542)
apps/webapp/app/v3/runEngine.server.ts (1)
  • engine (9-9)
apps/webapp/app/v3/marqs/marqsKeyProducer.ts (4)
  • queueCurrentConcurrencyKey (128-136)
  • envCurrentConcurrencyKey (162-167)
  • queueConcurrencyLimitKey (40-42)
  • envConcurrencyLimitKey (46-51)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
  • redisTest (158-158)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (3)
  • ReleaseConcurrencyQueueOptions (23-47)
  • ReleaseConcurrencyTokenBucketQueue (56-944)
  • options (478-490)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)
  • getLatestExecutionSnapshot (95-113)
🪛 Biome (1.9.4)
apps/webapp/app/components/admin/debugRun.tsx

[error] 391-391: Missing key property for this element in iterable.

The order of the items may change, and having a key can help React identify which item was moved.
Check the React documentation.

(lint/correctness/useJsxKeyInIterable)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (16)
internal-packages/run-engine/src/engine/types.ts (1)

50-51: Configuration properties added for releasings management

These new optional properties will control the lifespan and polling frequency of the "releasings" mechanism, which is used to track active releasers holding concurrency tokens. This supports the improved token refill reliability mentioned in the PR objectives.

apps/webapp/app/v3/runEngine.server.ts (1)

84-85: Added configuration for releasings management

These new properties connect the environment variables to the engine configuration, enabling the new release concurrency reliability features.

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (2)

198-205: Improved token refill logic with debug logging

The method has been updated to use the new refillTokensForSnapshot API which provides more reliable token refill based on snapshot IDs rather than environments. The added debug logging will help with troubleshooting concurrency issues.


236-242: Consistent token refill pattern with debug logging

Similar to the previous change, this updates the token refill logic to use snapshot IDs and adds debugging information, creating a consistent pattern throughout the codebase.

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (5)

36-36: Added import for ReleaseConcurrencySystem

This import enables integration with the release concurrency system.


44-44: Added ReleaseConcurrencySystem to system options

Allowing the release concurrency system to be injected as a dependency.


55-55: Added private member for ReleaseConcurrencySystem

This stores the reference to the injected system.


63-63: Initialized ReleaseConcurrencySystem from options

Standard initialization pattern consistent with other systems in this class.


1044-1045: Key improvement: Refill tokens on run cancellation

This addresses a critical part of the PR objectives - refilling tokens when a run is cancelled. The system now explicitly refills tokens from the snapshot when acknowledging messages during cancellation, which prevents token leakage in the EXECUTING_WITH_WAITPOINTS state.

This change, combined with the new releasings tracking mechanism, ensures that concurrency tokens are properly managed throughout the run lifecycle, improving system reliability.

apps/webapp/app/env.server.ts (1)

608-612: Environment variables added for release concurrency releasings system.

Two new env variables have been added to support the release concurrency mechanism:

  • RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE: Controls how long a run can hold a concurrency token (30min default)
  • RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL: Controls how often the system checks for expired tokens (1min default)

These parameters will allow tuning the behavior of the token cleanup mechanism.

internal-packages/run-engine/src/engine/index.ts (2)

204-206: Enhanced ReleaseConcurrencySystem with new configuration options.

The new parameters facilitate the token sweeping mechanism, allowing for better control over how long tokens can be held and how frequently they're validated.


285-285: Injected releaseConcurrencySystem into RunAttemptSystem.

This enables the run attempt system to directly handle token refilling when runs get canceled while in EXECUTING_WITH_WAITPOINTS state.

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1)

1508-1687: Comprehensive test for the new token sweeper system.

This test verifies that the release concurrency sweeper system correctly:

  1. Identifies runs that have held tokens for too long (beyond releasingsMaxAge)
  2. Refills tokens when a run transitions to PENDING_CANCEL state
  3. Ensures tokens are properly tracked and validated

Nice job adding comprehensive test coverage for this important reliability improvement.

apps/webapp/app/components/admin/debugRun.tsx (1)

69-75: Added engine version based conditional rendering.

Good approach to supporting both V1 and V2 engine versions with appropriate UI for each.

apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (1)

104-109:

✅ Verification successful

Possible double-prefix of Redis keys – verify with engine.runQueue.keys.*.

withPrefix() unconditionally adds engine:runqueue: to every key returned by engine.runQueue.keys.*.
If those helper functions already include the same prefix, the resulting key will be engine:runqueue:engine:runqueue:..., causing the UI to point at the wrong hash/set.

Requesting a quick check (or run the script below) will avoid silent debugging headaches.


🏁 Script executed:

#!/bin/bash
# Grep for a hard-coded "engine:runqueue:" in key helpers.
fd --extension ts runEngine | xargs rg --line-number 'engine:runqueue:'

Length of output: 71


🏁 Script executed:

#!/bin/bash
set -e

# Locate all TS files referencing "runQueue"
files=$(rg -l "runQueue" --glob "*.ts")
if [[ -z "$files" ]]; then
  echo "No files found containing 'runQueue'"
  exit 0
fi

echo "Found runQueue definitions in:"
echo "$files"
echo

# For each file, show the "keys" block (or related usage) with context
for file in $files; do
  echo "---- $file ----"
  rg -n -C3 "keys\s*=" "$file" || echo "  (no direct 'keys =' in this file)"
  echo
done

Length of output: 20646


🏁 Script executed:

#!/bin/bash
set -e

# Locate implementation of RunQueueFullKeyProducer
echo "Searching for RunQueueFullKeyProducer class definition..."
rg -n "class RunQueueFullKeyProducer" -R internal-packages/run-engine/src/engine

# Locate currentConcurrencyKey method implementation
echo
echo "Searching for currentConcurrencyKey method..."
rg -n "currentConcurrencyKey" -R internal-packages/run-engine/src/engine

# Show surrounding lines for context (±3) if found
if rg -q "currentConcurrencyKey" -R internal-packages/run-engine/src/engine; then
  rg -n -C3 "currentConcurrencyKey" -R internal-packages/run-engine/src/engine
else
  echo "No currentConcurrencyKey implementation found."
fi

Length of output: 621


🏁 Script executed:

#!/bin/bash
set -e

# Locate the RunQueueFullKeyProducer class and its file
echo "=== RunQueueFullKeyProducer definition ==="
rg -n -C3 "class RunQueueFullKeyProducer" internal-packages/run-engine/src/engine

echo
echo "=== currentConcurrencyKey implementation ==="
rg -n -C3 "currentConcurrencyKey" internal-packages/run-engine/src/engine

Length of output: 187


🏁 Script executed:

#!/bin/bash
set -e

# Search for RunQueueFullKeyProducer class definition across the entire repository
echo "=== Searching for RunQueueFullKeyProducer ==="
rg -n "class RunQueueFullKeyProducer" .

# Search for currentConcurrencyKey implementation across the entire repository
echo
echo "=== Searching for currentConcurrencyKey ==="
rg -n "currentConcurrencyKey" .

Length of output: 3808


🏁 Script executed:

#!/bin/bash
set -e

# Display the RunQueueFullKeyProducer class definition with context
rg -n -C5 "class RunQueueFullKeyProducer" internal-packages/run-engine/src/run-queue/keyProducer.ts

# Display the implementation of currentConcurrencyKey and currentConcurrencyKeyFromQueue
rg -n -C5 "currentConcurrencyKey" internal-packages/run-engine/src/run-queue/keyProducer.ts

Length of output: 1210


🏁 Script executed:

#!/bin/bash
set -e

# Display the implementation of queueKey within RunQueueFullKeyProducer
rg -n -C5 "queueKey" internal-packages/run-engine/src/run-queue/keyProducer.ts

Length of output: 1527


No double-prefix detected in Redis keys

I’ve verified that none of the RunQueueFullKeyProducer methods emit the hard-coded "engine:runqueue:" prefix. In internal-packages/run-engine/src/run-queue/keyProducer.ts:

  • queueKey(...) builds a colon-separated key (org, project, env, queue, concurrencyKey) with no global prefix.
  • currentConcurrencyKeyFromQueue(queue) appends only :currentConcurrency.
  • currentConcurrencyKey(env, queue, concurrencyKey) likewise joins the bare queueKey(...) with :current_concurrency.

Therefore, the withPrefix() wrapper in
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts
will prepend "engine:runqueue:" exactly once. No change needed.

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)

217-225: Return value of returnTokenOnly is ignored – lost-token risk

The Lua script short-circuits with "false" when the releaserId isn’t in the releasings set (lines 873-877).
returnToken() discards this boolean, so callers believe the token was returned even when nothing happened, potentially leaking concurrency slots.

Minimal fix:

-    await this.redis.returnTokenOnly(
+    const succeeded = await this.redis.returnTokenOnly(
       this.masterQueuesKey,
       this.#bucketKey(releaseQueue),
       this.#queueKey(releaseQueue),
       this.#metadataKey(releaseQueue),
       this.#releasingsKey(),
       releaseQueue,
       releaserId
     );
+
+    if (succeeded !== "true") {
+      this.logger.warn("Attempted to return a token that was not tracked in releasings", {
+        releaseQueueDescriptor,
+        releaserId,
+      });
+    }

[ suggest_essential_refactor ]

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (2)

1081-1101: Consider improving error handling in sweep method.

The sweep method processes each releaser ID sequentially. Consider adding try/catch blocks around the validation and token refill operations to ensure that an error processing one releaser doesn't prevent the processing of others.

  private async sweep() {
    const releasings = await this.queue.getReleasings(this.maxAge);

    this.logger.debug("Sweeping releasings:", { releasings });

    for (const releaserId of releasings) {
+     try {
        const result = await this.validateReleaserId(releaserId);

        this.logger.debug("Validated releaserId:", { releaserId, result });

        if (!result) {
          // We need to remove the releaserId from the releasings set
          await this.queue.removeReleaserIdFromReleasings(releaserId);
          continue;
        }

        if (result.shouldRefill) {
          await this.queue.refillTokenIfInReleasings(result.releaseQueue, result.releaserId);
        }
+     } catch (error) {
+       this.logger.error("Error processing releaser in sweep:", { releaserId, error });
+     }
    }
  }

76-76: Default logger level set to "debug".

The default logger level is now set to "debug" which provides detailed logging of the concurrency system behavior. This is helpful for diagnosing issues but might generate substantial log volume in production. Consider making this configurable based on environment.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 2bd533d and 7602efa.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (20 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (5)

478-490: The constructor arguments are now correctly ordered.

The implementation of #startReleasingsSweeper() now correctly passes arguments to the ReleaseConcurrencyReleasingsSweeper constructor with pollInterval as the third argument and maxAge as the fourth argument, fixing the issue identified in a previous review.


17-21: Well-designed releasings tracking mechanism.

The addition of the ReleaseConcurrencyValidatorResult type, the new validation-related options, and the helper methods for working with the releasings set create a robust framework for tracking token holders and validating their status over time.

Also applies to: 29-33, 456-458, 655-666


1048-1079: Well-implemented sweeper with proper error handling.

The ReleaseConcurrencyReleasingsSweeper implementation includes robust error handling for the overall sweeping process, with proper AbortError handling and continuous operation despite errors in individual sweep operations.


292-330: Effective token refill mechanism using releasings set.

The refillTokenIfInReleasings method and its corresponding Lua script effectively implement the logic to conditionally refill tokens only when the releaser ID is present in the releasings set, which is a key part of the reliability improvements for the release concurrency system.

Also applies to: 899-942


861-896: Improved token return mechanism with validation.

The updated returnTokenOnly Lua script now properly checks if the releaser ID exists in the releasings set before returning a token, preventing spurious token returns and ensuring accurate token counts.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (2)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)

553-561: ⚠️ Potential issue

Return value “skipped” is lost – outer function always resolves to "unblocked"

"skipped" is returned inside the runLock.lock callback but the outer function ignores that value.
Callers will therefore mis-interpret a finished run as "unblocked".

- await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => {
+ const lockResult = await this.$.runLock.lock(
+   "continueRunIfUnblocked",
+   [runId],
+   5000,
+   async () => {
      ...
-     return "skipped";
+     return "skipped";
   }
- );
+ );

and after the lock finishes:

- return "unblocked";
+ return (lockResult as "blocked" | "unblocked" | "skipped") ?? "unblocked";

This preserves the three-state contract.


596-598: 🛠️ Refactor suggestion

Refill should use newSnapshot, not stale snapshot

releaseConcurrencySystem.refillTokensForSnapshot(snapshot) is executed after newSnapshot is created, yet passes the old snapshot.
If token calculation relies on the latest execution status or metadata, this will act on outdated data.

- await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
+ await this.releaseConcurrencySystem.refillTokensForSnapshot(newSnapshot);
🧹 Nitpick comments (2)
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1)

485-487: Test timeout could be shortened to keep CI fast

The SUSPENDED test waits 1 000 ms * 3.5 ≈ 3.5 s.
With multiple suites this inflates pipeline time. Using a 100–200 ms heartbeat and proportionally smaller waits keeps semantics identical while shaving seconds off every run.
Example:

- const heartbeatTimeout = 1000;
+ const heartbeatTimeout = 150;
...
- await setTimeout(heartbeatTimeout * 1.5); // 225 ms
...
- await setTimeout(heartbeatTimeout * 2);   // 300 ms
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1)

671-686: Early "blocked" return skips cleanup

When blocking wait-points still exist the function returns "blocked" before it reaches the section that deletes TaskRunWaitpoint rows. That’s fine, but for consistency the deletion should also be skipped for "skipped" cases—otherwise finished runs will still incur an extra DELETE.

Consider wrapping the delete in:

if (status === "unblocked") {
  await prisma.taskRunWaitpoint.deleteMany(...);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 7602efa and 9754bed.

📒 Files selected for processing (7)
  • apps/webapp/app/env.server.ts (2 hunks)
  • apps/webapp/app/v3/runEngine.server.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (3 hunks)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/types.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (3)

367-375: More precise worker-id check looks good

The added guard (latestSnapshot.workerId && …) avoids noisy debug logs when workerId is undefined.
Nice low-noise improvement.


443-445: New SUSPENDED heartbeat slot wired correctly

#getHeartbeatIntervalMs now returns heartbeatTimeouts.SUSPENDED.
Make sure HeartbeatTimeouts type and configuration always include this field to prevent undefined intervals in prod configs.


411-424:

❓ Verification inconclusive

restartHeartbeatForRun can throw on duplicate job IDs

worker.enqueue() is called with the hard-coded ID heartbeatSnapshot.${runId}.
If a heartbeat job with the same ID is still present (e.g. the previous one hasn’t fired yet) most queue implementations will either:

  1. Reject the insert with a unique-constraint error, or
  2. Silently ignore it, leaving the old schedule in place.

Either outcome defeats the intent of “restart”.
Prefer rescheduling (or deleting + enqueueing) to guarantee exactly one future heartbeat:

- await this.$.worker.enqueue({
+ await this.$.worker.reschedule(
+   `heartbeatSnapshot.${runId}`,
+   new Date(Date.now() + intervalMs),
+   {
+     job: "heartbeatSnapshot",
+     payload: { snapshotId: latestSnapshot.id, runId },
+   }
+ );

(or use whatever atomic “upsert” your queue lib exposes).


🏁 Script executed:

#!/usr/bin/env bash
set -e
# Locate the ExecutionSnapshotSystem source file
file=$(rg -l "class ExecutionSnapshotSystem")
echo "ExecutionSnapshotSystem file: $file"
# Print imports, constructor, and dependency definitions for context
sed -n '1,200p' "$file"

Length of output: 6631


🏁 Script executed:

#!/usr/bin/env bash
set -e

# Find and display the SystemResources type definition
sys_file=$(rg -l "export type SystemResources" -g "*.ts")
echo "SystemResources defined in: $sys_file"
sed -n '1,200p' "$sys_file"

# Locate the ExecutionSnapshotSystem class constructor to see how 'worker' is defined
exec_file="internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts"
echo "Constructor context in ExecutionSnapshotSystem:"
rg -n "constructor" -A5 -B5 "$exec_file"

Length of output: 1698


🏁 Script executed:

#!/usr/bin/env bash
set -e

# Locate the EngineWorker interface/type
engine_worker_file=$(rg -l "EngineWorker" -g "internal-packages/run-engine/src/engine/types.*")
echo "EngineWorker defined in: $engine_worker_file"
sed -n '1,200p' "$engine_worker_file"

# Display the full ExecutionSnapshotSystem class around restartHeartbeatForRun
exec_file="internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts"
rg -n "restartHeartbeatForRun" -A10 -B10 "$exec_file"

Length of output: 5273


Use atomic reschedule to avoid duplicate heartbeat jobs

In restartHeartbeatForRun (internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts lines 411–422), calling

await this.$.worker.enqueue({ id: `heartbeatSnapshot.${runId}`,});

can either throw on duplicate IDs or silently no-op, defeating “restart.” Prefer an upsert-style reschedule:

- await this.$.worker.enqueue({
-   id: `heartbeatSnapshot.${runId}`,
-   job: "heartbeatSnapshot",
-   payload: { snapshotId: latestSnapshot.id, runId },
-   availableAt: new Date(Date.now() + intervalMs),
- });
+ await this.$.worker.reschedule(
+   `heartbeatSnapshot.${runId}`,
+   new Date(Date.now() + intervalMs)
+ );

If your Worker API requires passing job & payload on reschedule, include them as a third parameter:

- await this.$.worker.enqueue({ … });
+ await this.$.worker.reschedule(
+   `heartbeatSnapshot.${runId}`,
+   new Date(Date.now() + intervalMs),
+   {
+     job: "heartbeatSnapshot",
+     payload: { snapshotId: latestSnapshot.id, runId },
+   }
+ );

Finally, confirm that reschedule will create the job if it doesn’t exist; otherwise consider deleting the old entry + enqueue.

@ericallam ericallam merged commit 14e081e into main May 21, 2025
12 checks passed
@ericallam ericallam deleted the ea-branch-62 branch May 21, 2025 12:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants