Skip to content

valkey integration#97

Merged
nischitpra merged 4 commits intomainfrom
np/valkey
Feb 24, 2026
Merged

valkey integration#97
nischitpra merged 4 commits intomainfrom
np/valkey

Conversation

@nischitpra
Copy link
Contributor

@nischitpra nischitpra commented Feb 24, 2026

Summary by CodeRabbit

  • New Features

    • Added TLS support through rustls integration for secure connections.
    • Extended Redis support with cluster and async capabilities for enhanced scalability.
  • Refactoring

    • Migrated Redis connection handling to cluster-based architecture.
    • Updated Redis key naming scheme with hash tag prefixing for improved key distribution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 24, 2026

Walkthrough

This pull request migrates the Redis infrastructure from single-node ConnectionManager to cluster-based ClusterConnection, adds cluster-aware key tagging via ENGINE_HASH_TAG, extends Redis features to support clustering, and updates all queue, executor, and transaction registry implementations to use the cluster client with appropriately namespaced keys.

Changes

Cohort / File(s) Summary
Dependency Configuration
Cargo.toml, server/Cargo.toml
Extended redis features to include cluster, cluster-async, tls-rustls, and tokio-rustls-comp. Added rustls dependency with version 0.23.32 configured with std and ring crypto provider.
Queue Management Core
twmq/src/lib.rs, twmq/src/queue.rs, twmq/src/multilane.rs
Introduced public ENGINE_HASH_TAG constant for cluster-aware key distribution. Migrated Queue and MultilaneQueue structs from ConnectionManager to ClusterConnection. Updated all Redis key builders to prepend ENGINE_HASH_TAG. Modified Lua scripts to use new key formats and adjusted KEYS/ARGV ordering for cluster compatibility.
EOA Executor Storage
executors/src/eoa/store/mod.rs, executors/src/eoa/store/atomic.rs, executors/src/eoa/store/borrowed.rs, executors/src/eoa/store/pending.rs, executors/src/eoa/store/submitted.rs
Updated all SafeRedisTransaction validation signatures to accept ClusterConnection instead of ConnectionManager. Modified EoaExecutorStore struct and constructor to use ClusterConnection. Added ENGINE_HASH_TAG prefix to all EOA-related Redis key builders.
Executor Worker & Deployment
executors/src/eoa/worker/mod.rs, executors/src/external_bundler/deployment.rs
Changed EoaExecutorJobHandler and RedisDeploymentCache/RedisDeploymentLock from ConnectionManager to ClusterConnection. Added new pipeline-based utility methods to RedisDeploymentLock for batch operations. Updated cache and lock key generation to include ENGINE_HASH_TAG.
Transaction Storage
executors/src/solana_executor/storage.rs, executors/src/transaction_registry.rs
Migrated TransactionLock and SolanaTransactionStorage to use ClusterConnection. Updated TransactionRegistry to accept ClusterConnection and prepend ENGINE_HASH_TAG to registry keys.
Server Configuration & Router
server/src/main.rs, server/src/execution_router/mod.rs, server/src/queue/manager.rs
Added rustls crypto provider initialization. Transitioned from single Client to ClusterClient initialization by parsing comma-separated node addresses. Updated ExecutionRouter and QueueManager to use cluster connections. Modified connection retrieval from get_connection_manager to get_async_connection.
Integration Test Setup
integration-tests/tests/setup.rs
Updated Redis initialization to parse cluster node list and create ClusterClient. Changed connection retrieval method to get_async_connection.
Test Cleanup Utilities
twmq/tests/basic.rs, twmq/tests/basic_hook.rs, twmq/tests/delay.rs, twmq/tests/idempotency_modes.rs, twmq/tests/lease_expiry.rs, twmq/tests/nack.rs, twmq/tests/prune_race_condition.rs, twmq/tests/prune_race_random_ids.rs, twmq/tests/multilane_batch_pop.rs
Updated all test helper functions to use ClusterConnection instead of ConnectionManager. Modified Redis key cleanup patterns to include ENGINE_HASH_TAG prefix in KEYS queries.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant QueueManager
    participant ClusterClient
    participant RedisCluster

    Client->>QueueManager: initialize with Redis URL
    QueueManager->>ClusterClient: parse comma-separated nodes<br/>create ClusterClient
    QueueManager->>RedisCluster: get_async_connection()
    RedisCluster-->>QueueManager: ClusterConnection
    
    QueueManager->>QueueManager: build Queue/Registry<br/>with ClusterConnection
    
    Client->>QueueManager: push/pop job
    QueueManager->>RedisCluster: execute command on {engine}:queue:key
    RedisCluster-->>QueueManager: result
    QueueManager-->>Client: job/result
Loading
sequenceDiagram
    participant EOAExecutor
    participant SafeRedisTransaction
    participant ClusterConnection
    participant RedisCluster

    EOAExecutor->>SafeRedisTransaction: initiate transaction
    SafeRedisTransaction->>SafeRedisTransaction: validation(conn: ClusterConnection)
    SafeRedisTransaction->>ClusterConnection: query with {engine}:eoa:key
    ClusterConnection->>RedisCluster: execute cluster command
    RedisCluster-->>ClusterConnection: data
    ClusterConnection-->>SafeRedisTransaction: validation result
    SafeRedisTransaction->>SafeRedisTransaction: execute(conn, validation_data)
    SafeRedisTransaction->>ClusterConnection: write operations
    ClusterConnection->>RedisCluster: pipeline/multi commands
    RedisCluster-->>ClusterConnection: confirmation
    ClusterConnection-->>SafeRedisTransaction: result
    SafeRedisTransaction-->>EOAExecutor: transaction complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'valkey integration' is vague and generic, using a non-descriptive term that doesn't clearly convey the specific changes made in the changeset. Provide a more descriptive title that captures the main technical change, such as 'Migrate Redis to Valkey Cluster with hash tag prefixing' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch np/valkey

Comment @coderabbitai help to get the list of available commands and usage tips.

@nischitpra nischitpra merged commit 6bf1f2f into main Feb 24, 2026
5 of 8 checks passed
@nischitpra nischitpra deleted the np/valkey branch February 24, 2026 05:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
executors/src/external_bundler/deployment.rs (2)

179-185: ⚠️ Potential issue | 🟠 Major

Lock has no TTL — risk of permanent deadlock on crash.

The comment says "SET NX EX" but only set_nx is used — no expiry is set on the lock key. If the lock holder crashes before calling release_lock, this key persists forever and no other process can acquire it.

Consider using SET key value NX EX <ttl> via a raw command or the set_options API to atomically set the lock with a TTL:

Proposed fix sketch
-        // Use SET NX EX for atomic acquire
-        let result: Option<String> =
-            conn.set_nx(&key, &lock_data_str)
-                .await
-                .map_err(|e| EngineError::InternalError {
-                    message: format!("Lock acquire failed: {e}"),
-                })?;
+        // Atomic SET NX EX: acquire lock with a 5-minute safety TTL
+        let result: Option<String> = redis::cmd("SET")
+            .arg(&key)
+            .arg(&lock_data_str)
+            .arg("NX")
+            .arg("EX")
+            .arg(300u64)
+            .query_async(&mut conn)
+            .await
+            .map_err(|e| EngineError::InternalError {
+                message: format!("Lock acquire failed: {e}"),
+            })?;

Note: when using SET ... NX, the reply is either "OK" (Some) or nil (None), so Option<String> is the correct return type — which also fixes the set_nx return-type issue flagged above.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/external_bundler/deployment.rs` around lines 179 - 185, The
lock acquisition currently uses conn.set_nx(&key, &lock_data_str) without an
expiry, risking permanent deadlock; change the acquire path to perform an atomic
SET with NX and EX (e.g., use conn.set_options or a raw SET command) to set a
TTL when creating the lock key (still returning Option<String> as the SET reply
is "OK" or nil). Ensure the same key and lock_data_str are used and keep
release_lock behavior unchanged so the lock is removed when finished; pick an
appropriate ttl constant and document it near the acquire logic.

179-205: ⚠️ Potential issue | 🔴 Critical

Critical: set_nx with Option<String> return type makes the lock non-exclusive, and no TTL is set on acquisition.

The SETNX command returns an integer (1 = set, 0 = not set). Decoding this as Option<String> yields Some("1") or Some("0") — never None — because the String::FromRedisValue implementation converts integers to their string representation. Consequently, the Some(_) arm at line 188 always matches, and every call returns AcquireLockResult::Acquired regardless of whether the lock already exists. Additionally, the comment at line 179 states "Use SET NX EX for atomic acquire," but the code uses bare set_nx() without any TTL; if a process crashes after acquiring the lock, it will be held indefinitely.

Fix: Change the return type to bool and add an expiration time:

Proposed fix
-        // Use SET NX EX for atomic acquire
-        let result: Option<String> =
-            conn.set_nx(&key, &lock_data_str)
-                .await
-                .map_err(|e| EngineError::InternalError {
-                    message: format!("Lock acquire failed: {e}"),
-                })?;
-
-        match result {
-            Some(_) => Ok(AcquireLockResult::Acquired),
-            None => {
+        // Use SET NX EX for atomic acquire with expiration
+        let was_set: bool =
+            conn.set_nx(&key, &lock_data_str)
+                .await
+                .map_err(|e| EngineError::InternalError {
+                    message: format!("Lock acquire failed: {e}"),
+                })?;
+
+        if was_set {
+            // Set lock expiration to prevent indefinite locks on crashes
+            conn.expire(&key, 3600)  // 1 hour TTL
+                .await
+                .map_err(|e| EngineError::InternalError {
+                    message: format!("Failed to set lock expiration: {e}"),
+                })?;
+            Ok(AcquireLockResult::Acquired)
+        } else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/external_bundler/deployment.rs` around lines 179 - 205, The
redis SETNX usage is wrong: conn.set_nx(&key, &lock_data_str) is being decoded
as Option<String> so it always hits Some(_) and always returns
AcquireLockResult::Acquired and it also never sets a TTL; change the call to use
a boolean-returning set with an expiration (atomic SET ... NX EX) so acquisition
succeeds only when the key was created and the key has a TTL; specifically
replace the conn.set_nx(...) call with an atomic set that returns a bool (or use
the client API that maps to Redis "SET" with NX and EX) and then interpret true
=> AcquireLockResult::Acquired, false => fetch existing value (same
existing_data path) and return
AcquireLockResult::AlreadyLocked(existing_lock_id); ensure you pass the TTL
(expiration seconds) when writing lock_data_str and keep the existing
get/serde_json logic for the failure branch.
twmq/tests/basic_hook.rs (1)

68-68: ⚠️ Potential issue | 🔴 Critical

Pipeline failure: tests fail because REDIS_URL points to a non-cluster Redis instance.

The pipeline error confirms this:

"This instance has cluster support disabled"

The ClusterClient requires connecting to a Redis instance running in cluster mode. The hardcoded redis://127.0.0.1:6379/ points to a standalone Redis, which rejects cluster commands.

Options:

  1. Use a Redis Cluster in CI (e.g., via docker-compose with cluster-enabled nodes or a test container).
  2. Keep a non-cluster code path for tests (e.g., feature-gated).
  3. Use a single-node Redis Cluster setup (possible with cluster-enabled yes and appropriate config).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/basic_hook.rs` at line 68, The test fails because the hardcoded
REDIS_URL in tests/basic_hook.rs points at a standalone Redis while
ClusterClient requires cluster mode; update the test to either (A) point
REDIS_URL to a cluster-enabled instance (e.g., single-node cluster endpoint your
CI provides) so ClusterClient can connect, or (B) add a test branch/feature gate
that uses the non-cluster client path instead of ClusterClient when cluster
support is unavailable. Locate and modify the constant REDIS_URL and the code
that constructs/uses ClusterClient in the test so it either reads a CI-provided
cluster URL or conditionally falls back to the non-cluster client.
twmq/src/multilane.rs (1)

1296-1347: ⚠️ Potential issue | 🔴 Critical

Critical: WATCH/MULTI/EXEC is incompatible with ClusterConnection.

ClusterConnection in redis-rs does not support WATCH and does not guarantee connection affinity — each .query_async() call may be dispatched to a different underlying TCP connection in the cluster pool. The WATCH → read → MULTI/EXEC pattern (lines 1302–1322) requires all commands to execute on the same connection. When commands are routed to different connections, the WATCH state is silently lost and optimistic locking provides no protection.

This pattern appears in:

  • complete_job (lines 1296–1347)
  • complete_job_queue_error (lines 1405+)
  • executors/src/eoa/store/atomic.rs (multiple locations)

Consequence: Lease-based protection for job completion is effectively disabled, enabling duplicate completions and data corruption under concurrent access.

Fix options:

  1. Replace WATCH/MULTI/EXEC with a Lua script (EVAL/EVALSHA) to atomically check the lease and perform operations.
  2. Use a direct (non-cluster) Redis connection for transaction sequences, if feasible in your architecture.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/multilane.rs` around lines 1296 - 1347, The WATCH/MULTI/EXEC
sequence in complete_job (and similarly in complete_job_queue_error and
locations in executors/src/eoa/store/atomic.rs) is unsafe with
ClusterConnection; replace the optimistic-transaction flow that uses
lease_key_name, hook_pipeline, and the subsequent post_*_completion calls with a
single EVAL/EVALSHA Lua script that (1) atomically checks the lease key and
lease token, (2) applies the same Redis mutations encoded by hook_pipeline, and
(3) returns a status so the Rust side can call
post_success_completion/post_nack_completion/post_fail_completion only after the
script confirms success; implement script loading (EVALSHA fallback to EVAL) and
reuse the same script for all these places to ensure atomicity across the
cluster.
twmq/tests/basic.rs (1)

14-70: ⚠️ Potential issue | 🟠 Major

Queue::new() requires cluster-capable Redis; CI will fail with standard Redis

The Queue::new() implementation uses redis::cluster::ClusterClient::new(), which requires a cluster-enabled Redis or Valkey instance. The CI workflow currently runs standard redis:7-alpine (non-cluster), which cannot satisfy this requirement and will fail with "cluster support disabled" errors. Either:

  • Update CI to run a cluster-capable Redis/Valkey service, or
  • Modify Queue::new() to support both cluster and non-cluster Redis connections
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/basic.rs` around lines 14 - 70, Queue::new currently always uses
redis::cluster::ClusterClient which fails on non-cluster Redis in CI; change
Queue::new to try creating a cluster connection first and on failure fall back
to a standard redis::Client connection (or detect the URL and choose
accordingly), and update the Queue struct's redis field to hold a type that can
represent either connection (e.g. an enum wrapper or a boxed trait like
redis::aio::ConnectionManager) so cleanup_redis_keys and other call sites
(referencing Queue::new, redis::cluster::ClusterClient, ClusterConnection, and
any Queue::redis usages) can operate against both cluster and non-cluster
connections without panicking. Ensure the fallback preserves async query
compatibility for KEYS/DEL and that error handling logs the chosen backend.
twmq/src/lib.rs (2)

1256-1308: ⚠️ Potential issue | 🔴 Critical

WATCH/MULTI/EXEC is not supported on ClusterConnection — this will fail at runtime.

The redis-rs 0.31.0 crate's ClusterConnection explicitly does not support Redis transactions. The cluster connection type reports supports_pipelining() -> false and its pipeline variant is documented as "does not support transactions." Lines 1262–1266 issue a raw WATCH command, then lines 1280–1286 execute an atomic pipeline (MULTI/EXEC), which will error at runtime on a cluster connection.

This same pattern appears in complete_job_queue_error at lines 1369–1372 and 1386–1387.

Since all keys share the {engine} hash slot, consider:

  1. Using a dedicated single-node connection to the specific cluster node owning that slot, rather than ClusterConnection.
  2. Using a Lua script to achieve atomic check-and-update (Lua scripts are atomic on a single node).
  3. Using Redlock or another distributed lock mechanism instead of WATCH-based optimistic concurrency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 1256 - 1308, The code uses WATCH/MULTI/EXEC
against a ClusterConnection (see usage of lease_key, cloned redis connection,
and atomic_pipeline.query_async) which is unsupported and will fail at runtime;
replace the WATCH-based transaction in this completion loop (and the similar
pattern in complete_job_queue_error) with an atomic alternative: either acquire
a single-node connection for the {engine} slot and run the existing
WATCH/MULTI/EXEC there, or (preferred) convert the pipeline + lease-exists check
into a single EVAL/EVALSHA Lua script that checks existence of lease_key and
performs the same updates atomically, then call the script (using the same redis
client interface) instead of atomic_pipeline.query_async; ensure you reference
lease_key, hook_pipeline operations, and the post-completion branches
(post_success_completion, post_nack_completion, post_fail_completion) so the Lua
script performs the exact writes previously in the pipeline or the single-node
connection targets the node owning the {engine} slot.

603-752: ⚠️ Potential issue | 🟠 Major

Lua scripts dynamically construct keys not declared in KEYS[] — cluster compliance risk.

The pop_batch_jobs Lua script (and similarly the push, trim scripts) constructs keys dynamically within Lua:

local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta'
local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token

These keys are not passed via KEYS[]. The Redis Cluster specification states that all keys accessed by a script must be provided as KEYS arguments so the cluster can verify they belong to the same hash slot. While this works today because all keys share {engine}, some Valkey/Redis configurations enforce strict key checking (allow-cross-slot-keys no or similar), and proxy layers (e.g., Envoy, Twemproxy) may reject such scripts.

Additionally, the string {engine} is hardcoded in the Lua rather than derived from the ENGINE_HASH_TAG constant, creating a maintenance coupling. If the constant ever changes, these scripts will silently use a different hash slot.

Valkey cluster lua script KEYS undeclared key access
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 603 - 752, The Lua script pop_batch_jobs
constructs keys like job_meta_hash_name and lease_key inside the script (and
similarly in other scripts) instead of receiving them via KEYS[], which violates
Redis Cluster key checking and hardcodes '{engine}'; fix by changing the code
that registers/executed the redis::Script for pop_batch_jobs (and the push/trim
scripts) to pass every Redis key the script touches through KEYS[] (e.g.,
delayed_zset_name, pending_list_name, active_hash_name, job_data_hash_name,
pending_cancellation_set, failed_list_name, success_list_name plus per-job meta
and lease keys for popped job ids) and update the Lua body to reference those
KEYS[] entries rather than constructing strings; also replace the literal
'{engine}' with the ENGINE_HASH_TAG constant value when building the KEYS[]
array so the script uses the same hash tag as the rest of the code.
♻️ Duplicate comments (4)
twmq/tests/idempotency_modes.rs (1)

7-72: Duplicate: cluster-enabled Redis requirement
Covered by the comment in twmq/tests/basic.rs about needing a cluster-capable Redis endpoint for tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/idempotency_modes.rs` around lines 7 - 72, This comment duplicates
the cluster-enabled Redis requirement already documented in twmq/tests/basic.rs;
remove the redundant note from twmq/tests/idempotency_modes.rs (or replace it
with a single-line reference) so there's a single source of truth; look for
references in this file around the cleanup_redis_keys helper and the test setup
in idempotency_modes.rs and either delete the duplicate comment or add a concise
pointer to the existing documentation in basic.rs.
twmq/tests/prune_race_random_ids.rs (1)

7-127: Duplicate: cluster-enabled Redis requirement
Covered by the comment in twmq/tests/basic.rs about needing a cluster-capable Redis endpoint for tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/prune_race_random_ids.rs` around lines 7 - 127, The comment about
requiring a cluster-enabled Redis endpoint is duplicated (already in
twmq/tests/basic.rs); remove the redundant note from this test file and either
reference the existing comment or consolidate the requirement into a single
shared place; update or remove the block containing REDIS_URL/ClusterConnection
setup or the top-of-file comment so only the authoritative cluster-enabled Redis
requirement remains (look for REDIS_URL, ClusterConnection, and
cleanup_redis_keys in this file to locate the duplicated guidance).
twmq/tests/nack.rs (1)

13-31: Duplicate: cluster-enabled Redis requirement
Covered by the comment in twmq/tests/basic.rs about needing a cluster-capable Redis endpoint for tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/nack.rs` around lines 13 - 31, The comment about requiring a
cluster-capable Redis endpoint is duplicated here; remove the redundant note in
this test and either reference the existing guidance in the central/basic test
or move the single authoritative note into the shared test fixtures (e.g., where
REDIS_URL or cleanup_redis_keys is defined) so tests like cleanup_redis_keys and
constants such as REDIS_URL rely on one documented cluster requirement location.
twmq/src/lib.rs (1)

963-1010: Hardcoded {engine} in trim Lua scripts — same issue as pop_batch_jobs.

Both post_success_completion and post_fail_completion trim scripts construct job_meta_hash and errors_list_name with hardcoded 'twmq:{engine}:' strings. Same cluster compliance and maintenance concerns as flagged in the pop_batch_jobs script.

Also applies to: 1144-1200

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/lib.rs` around lines 963 - 1010, The Lua trim_script (and the
similar scripts used by post_success_completion and post_fail_completion)
currently hardcode 'twmq:{engine}:' into job_meta_hash and errors_list_name;
update these scripts to accept the engine prefix as an input (e.g., add an ARGV
or KEYS parameter like engine_prefix or engine_name) and concatenate that
parameter instead of the literal '{engine}' when building job_meta_hash and
errors_list_name (locate occurrences in trim_script, and in the
post_success_completion/post_fail_completion scripts, and replace the hardcoded
string with the passed-in parameter variable).
🧹 Nitpick comments (7)
executors/src/external_bundler/deployment.rs (1)

85-90: Duplicate cache_key implementation across RedisDeploymentCache and RedisDeploymentLock.

This method is identical to RedisDeploymentCache::cache_key (line 46). Consider extracting a shared helper (e.g., a free function fn deployment_cache_key(chain_id: u64, account_address: &Address) -> String) to avoid divergence.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/external_bundler/deployment.rs` around lines 85 - 90, The
duplicate cache_key implementation in RedisDeploymentLock::cache_key and
RedisDeploymentCache::cache_key should be extracted to a shared helper: add a
free function like fn deployment_cache_key(chain_id: u64, account_address:
&Address) -> String that returns the formatted key using twmq::ENGINE_HASH_TAG
and the same "{CACHE_PREFIX}" token, then have both
RedisDeploymentLock::cache_key and RedisDeploymentCache::cache_key call that
helper (replace their bodies with a call to deployment_cache_key) to avoid
duplication and keep formatting consistent.
twmq/src/queue.rs (1)

191-203: URL parsing assumes comma-separated cluster nodes — document this contract.

The redis_url parameter now expects a comma-separated list of node addresses (e.g., redis://node1:6379,redis://node2:6380), but the redis_url method on Line 107 accepts a generic S: Into<String> with no doc hint. A single-node URL still works (no commas → one node), but callers won't know this changed.

Consider adding a brief doc comment on redis_url to clarify the expected format.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/queue.rs` around lines 191 - 203, The Redis URL handling now treats
a URL string as a comma-separated list of cluster node addresses (see
RedisSource::Url parsing), but the public builder method redis_url(S:
Into<String>) lacks documentation about this contract; update the doc comment on
the redis_url method to state that the parameter should be a comma-separated
list of Redis node URLs (e.g., "redis://node1:6379,redis://node2:6380"), note
that a single URL (no commas) is supported, and mention any accepted URL formats
and trimming behavior so callers know how to supply cluster endpoints.
twmq/src/multilane.rs (1)

406-408: Lua scripts construct keys dynamically instead of declaring them in KEYS[] — works due to hash tag but is fragile.

Multiple Lua scripts (e.g., cancel_job, pop_batch_jobs, post_success_completion, post_fail_completion) build Redis keys via string concatenation inside the script body (e.g., Line 406: 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending'). These dynamically constructed keys are not passed via KEYS[].

This works because all keys share the {engine} hash tag and thus map to the same cluster slot. However, some Redis Cluster proxies and managed services (e.g., AWS ElastiCache in cluster mode) enforce that all accessed keys must be declared in KEYS[] and will reject scripts that access undeclared keys.

If you only target Valkey/Redis OSS clusters, this is acceptable. If managed services are in scope, consider passing all keys via KEYS[] or restructuring to avoid dynamic key construction in Lua.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/src/multilane.rs` around lines 406 - 408, The Lua scripts (cancel_job,
pop_batch_jobs, post_success_completion, post_fail_completion) currently build
keys like local lane_pending_list / lane_delayed_zset / lane_active_hash via
string concatenation inside the script; change these scripts to accept all Redis
keys via KEYS[] and replace uses of the concatenated variables with KEYS[n] (and
update the caller that loads/calls these scripts to pass the correct key list in
order), ensuring each lane/queue key is provided in KEYS[] so no dynamic key
construction occurs in the script body and cluster proxies (e.g., ElastiCache)
accept the script.
twmq/tests/basic_hook.rs (1)

24-41: KEYS command is not cluster-safe for multi-node setups.

redis::cmd("KEYS") only scans the node the command is routed to, not the entire cluster. In a multi-node cluster, this cleanup will miss keys on other nodes. Since all keys share the {engine} hash tag (same slot → same node), this happens to work, but it's a subtle assumption worth documenting.

Also, KEYS is O(N) over the entire keyspace and blocks the node — SCAN is preferred even in tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/basic_hook.rs` around lines 24 - 41, The test helper
cleanup_redis_keys uses redis::cmd("KEYS") which is not cluster-safe and is
blocking; replace it with an async, cluster-safe incremental scan using SCAN (or
CLUSTER KEYSLOT+SCAN if needed) over the connection so all nodes are covered and
the command is non-blocking. In practice in the cleanup_redis_keys function,
iterate with redis SCAN on the ClusterConnection using the keys_pattern
(twmq::ENGINE_HASH_TAG + queue_name) to collect matching keys in pages and then
DEL them (or pipeline deletes) only after gathering them; ensure the scan loop
handles cursor=0 termination and uses query_async on the same connection rather
than KEYS so the helper works correctly in multi-node setups and avoids
blocking.
server/src/main.rs (1)

78-85: Duplicate node-parsing logic — consider extracting a shared helper.

The same comma-split-trim-filter-collect pattern for building initial_nodes appears verbatim in integration-tests/tests/setup.rs (lines 171–177) and in twmq/src/multilane.rs (lines 48–57). Extracting it once — e.g., as a free function fn parse_redis_nodes(url: &str) -> Vec<&str> in twmq or a shared config module — reduces drift and makes it easy to add validation (e.g., reject an empty node list).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/src/main.rs` around lines 78 - 85, Extract the repeated
comma-split-trim-filter-collect logic into a single helper (e.g. fn
parse_redis_nodes(url: &str) -> Vec<String>) in a shared crate/module (twmq or a
config util), then replace the inline code in main.rs (where initial_nodes is
built) and the duplicates in integration-tests/tests/setup.rs and
twmq/src/multilane.rs to call parse_redis_nodes(&config.redis.url). Make the
helper return owned Strings (Vec<String>) to avoid lifetime issues, and add
simple validation inside it to return an error or panic when the resulting node
list is empty so callers like twmq::redis::cluster::ClusterClient::new get a
non-empty list.
executors/src/transaction_registry.rs (1)

33-36: Plan migration for the new registry key format
The registry key now includes ENGINE_HASH_TAG, so existing entries under the prior key will no longer be read. If you need continuity during upgrades, consider a short dual-read window or a migration/backfill step.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/transaction_registry.rs` around lines 33 - 36, registry_key now
embeds twmq::ENGINE_HASH_TAG (in function registry_key using self.namespace),
which will break reads of existing registry entries; implement a migration
strategy: update registry_key to perform a dual-read during a transition window
(try new key first then fallback to the old key format without ENGINE_HASH_TAG)
or add a one-time migration/backfill routine that scans for old keys and
re-writes them using the new key format (reference registry_key, self.namespace,
and twmq::ENGINE_HASH_TAG to locate where to change read/write logic).
executors/src/eoa/store/mod.rs (1)

123-336: Consider a helper to reduce repetition across 12 key-builder methods.

Every method follows the identical match &self.namespace { Some(ns) => format!("{ns}:{}:...", twmq::ENGINE_HASH_TAG, ...), None => format!("{}:...", twmq::ENGINE_HASH_TAG, ...) } pattern. A single private helper like fn prefix(&self) -> String would collapse the duplication.

Example helper
+    fn key_prefix(&self) -> String {
+        match &self.namespace {
+            Some(ns) => format!("{ns}:{}", twmq::ENGINE_HASH_TAG),
+            None => twmq::ENGINE_HASH_TAG.to_string(),
+        }
+    }
+
     pub fn eoa_lock_key_name(&self) -> String {
-        match &self.namespace {
-            Some(ns) => format!(
-                "{ns}:{}:eoa_executor:lock:{}:{}",
-                twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
-            ),
-            None => format!(
-                "{}:eoa_executor:lock:{}:{}",
-                twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
-            ),
-        }
+        format!("{}:eoa_executor:lock:{}:{}", self.key_prefix(), self.chain_id, self.eoa)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/mod.rs` around lines 123 - 336, Many key-builder
methods (eoa_lock_key_name, transaction_data_key_name,
transaction_attempts_list_name, pending_transactions_zset_name,
submitted_transactions_zset_name, transaction_hash_to_id_key_name,
borrowed_transactions_hashmap_name, recycled_nonces_zset_name,
optimistic_transaction_count_key_name, last_transaction_count_key_name,
eoa_health_key_name, manual_reset_key_name) repeat the same namespace
match/format logic; add a private helper fn prefix(&self) -> String that returns
either "{ns}:{ENGINE_HASH_TAG}" or "{ENGINE_HASH_TAG}" and refactor each method
to call this prefix and then append the remaining suffix (e.g.,
":eoa_executor:lock:{}:{}" or formatted transaction-specific suffix), removing
the duplicated match blocks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@executors/src/eoa/store/atomic.rs`:
- Around line 7-8: The code uses ClusterConnection with WATCH/MULTI/EXEC via the
SafeRedisTransaction trait, but ClusterConnection doesn't support WATCH
(connection-affinity issue) which breaks the EOA lock safety; change the
implementation to replace WATCH-based optimistic transactions with an atomic Lua
script (EVAL) or a Redis server-side CAS routine for the lock flow.
Specifically, update the SafeRedisTransaction consumers — with_lock_check and
execute_with_watch_and_retry — to call a single EVAL that checks the lock owner
and nonce and then sets/updates or aborts atomically (so both the check and the
write occur server-side), remove reliance on WATCH/MULTI/EXEC, and stop using
ClusterConnection for WATCH paths (keep ClusterConnection but invoke EVAL via
its async commands). Ensure the Lua script returns clear success/failure codes
that with_lock_check and execute_with_watch_and_retry interpret to retry/fail as
before.

In `@executors/src/eoa/store/mod.rs`:
- Around line 123-134: The key construction in eoa_lock_key_name() can have
namespace containing '{' or '}', which would change Redis Cluster hash-tag
parsing and break slot affinity; update the logic in EoaExecutorStoreKeys (e.g.,
in EoaExecutorStoreKeys::new) to either validate/escape namespace (reject or
strip '{' and '}') or move the twmq::ENGINE_HASH_TAG hash tag to the front of
the key format (e.g., wrap ENGINE_HASH_TAG in braces before namespace) and then
update eoa_lock_key_name() and other key-formatting methods to use the
safe/tag-first format; ensure any validation error surfaces at construction time
so no keys are created with illegal characters.

In `@integration-tests/tests/setup.rs`:
- Around line 178-179: The error context on ClusterClient::new is misleading —
it parses/validates node URLs rather than opening a connection; update the
context string for twmq::redis::cluster::ClusterClient::new(initial_nodes) to
indicate a config/URL parse/validation failure (e.g., "Failed to parse/validate
Valkey cluster node URLs") and leave connection errors to be reported when
calling get_async_connection(); ensure any related error messages reference
parsing/validation rather than connection attempts for clarity.

In `@twmq/tests/delay.rs`:
- Around line 134-135: Tests currently construct a redis::cluster::ClusterClient
and call get_async_connection().unwrap(), which panics against a non-cluster
Redis; instead either enable single-node cluster mode in CI (set cluster-enabled
yes, cluster-node-timeout 5000, cluster-config-file nodes.conf) or change the
tests to reuse the queue's existing ClusterConnection (avoid creating a new
ClusterClient in tests and use the Queue/MultilaneQueue provided
ClusterConnection instance), remove unnecessary Arc<ClusterConnection> wrappers
since ClusterConnection is Clone, and replace unwrap()/expect() calls around
ClusterClient/get_async_connection with proper error handling when acquiring the
reused connection.

In `@twmq/tests/multilane_batch_pop.rs`:
- Line 93: The test currently builds keys_pattern using ENGINE_HASH_TAG and
self.queue_id and then uses KEYS + DEL which in cluster mode only hits a single
node; update the cleanup to iterate with SCAN (per-node) routed to the proper
slot by using RoutingInfo::SingleNode for the hash-tagged slot so all nodes are
scanned and matching keys are deleted, or if you prefer a minimal change add a
clear comment in the test near the keys_pattern/KEYS usage noting that cleanup
is best-effort in cluster mode and that MultilaneQueue::new(...) creates a
ClusterClient so the test requires a cluster Redis instance; reference
keys_pattern, ENGINE_HASH_TAG, self.queue_id, MultilaneQueue::new,
RoutingInfo::SingleNode, SCAN, and avoid using KEYS + DEL in cluster tests.

---

Outside diff comments:
In `@executors/src/external_bundler/deployment.rs`:
- Around line 179-185: The lock acquisition currently uses conn.set_nx(&key,
&lock_data_str) without an expiry, risking permanent deadlock; change the
acquire path to perform an atomic SET with NX and EX (e.g., use conn.set_options
or a raw SET command) to set a TTL when creating the lock key (still returning
Option<String> as the SET reply is "OK" or nil). Ensure the same key and
lock_data_str are used and keep release_lock behavior unchanged so the lock is
removed when finished; pick an appropriate ttl constant and document it near the
acquire logic.
- Around line 179-205: The redis SETNX usage is wrong: conn.set_nx(&key,
&lock_data_str) is being decoded as Option<String> so it always hits Some(_) and
always returns AcquireLockResult::Acquired and it also never sets a TTL; change
the call to use a boolean-returning set with an expiration (atomic SET ... NX
EX) so acquisition succeeds only when the key was created and the key has a TTL;
specifically replace the conn.set_nx(...) call with an atomic set that returns a
bool (or use the client API that maps to Redis "SET" with NX and EX) and then
interpret true => AcquireLockResult::Acquired, false => fetch existing value
(same existing_data path) and return
AcquireLockResult::AlreadyLocked(existing_lock_id); ensure you pass the TTL
(expiration seconds) when writing lock_data_str and keep the existing
get/serde_json logic for the failure branch.

In `@twmq/src/lib.rs`:
- Around line 1256-1308: The code uses WATCH/MULTI/EXEC against a
ClusterConnection (see usage of lease_key, cloned redis connection, and
atomic_pipeline.query_async) which is unsupported and will fail at runtime;
replace the WATCH-based transaction in this completion loop (and the similar
pattern in complete_job_queue_error) with an atomic alternative: either acquire
a single-node connection for the {engine} slot and run the existing
WATCH/MULTI/EXEC there, or (preferred) convert the pipeline + lease-exists check
into a single EVAL/EVALSHA Lua script that checks existence of lease_key and
performs the same updates atomically, then call the script (using the same redis
client interface) instead of atomic_pipeline.query_async; ensure you reference
lease_key, hook_pipeline operations, and the post-completion branches
(post_success_completion, post_nack_completion, post_fail_completion) so the Lua
script performs the exact writes previously in the pipeline or the single-node
connection targets the node owning the {engine} slot.
- Around line 603-752: The Lua script pop_batch_jobs constructs keys like
job_meta_hash_name and lease_key inside the script (and similarly in other
scripts) instead of receiving them via KEYS[], which violates Redis Cluster key
checking and hardcodes '{engine}'; fix by changing the code that
registers/executed the redis::Script for pop_batch_jobs (and the push/trim
scripts) to pass every Redis key the script touches through KEYS[] (e.g.,
delayed_zset_name, pending_list_name, active_hash_name, job_data_hash_name,
pending_cancellation_set, failed_list_name, success_list_name plus per-job meta
and lease keys for popped job ids) and update the Lua body to reference those
KEYS[] entries rather than constructing strings; also replace the literal
'{engine}' with the ENGINE_HASH_TAG constant value when building the KEYS[]
array so the script uses the same hash tag as the rest of the code.

In `@twmq/src/multilane.rs`:
- Around line 1296-1347: The WATCH/MULTI/EXEC sequence in complete_job (and
similarly in complete_job_queue_error and locations in
executors/src/eoa/store/atomic.rs) is unsafe with ClusterConnection; replace the
optimistic-transaction flow that uses lease_key_name, hook_pipeline, and the
subsequent post_*_completion calls with a single EVAL/EVALSHA Lua script that
(1) atomically checks the lease key and lease token, (2) applies the same Redis
mutations encoded by hook_pipeline, and (3) returns a status so the Rust side
can call post_success_completion/post_nack_completion/post_fail_completion only
after the script confirms success; implement script loading (EVALSHA fallback to
EVAL) and reuse the same script for all these places to ensure atomicity across
the cluster.

In `@twmq/tests/basic_hook.rs`:
- Line 68: The test fails because the hardcoded REDIS_URL in tests/basic_hook.rs
points at a standalone Redis while ClusterClient requires cluster mode; update
the test to either (A) point REDIS_URL to a cluster-enabled instance (e.g.,
single-node cluster endpoint your CI provides) so ClusterClient can connect, or
(B) add a test branch/feature gate that uses the non-cluster client path instead
of ClusterClient when cluster support is unavailable. Locate and modify the
constant REDIS_URL and the code that constructs/uses ClusterClient in the test
so it either reads a CI-provided cluster URL or conditionally falls back to the
non-cluster client.

In `@twmq/tests/basic.rs`:
- Around line 14-70: Queue::new currently always uses
redis::cluster::ClusterClient which fails on non-cluster Redis in CI; change
Queue::new to try creating a cluster connection first and on failure fall back
to a standard redis::Client connection (or detect the URL and choose
accordingly), and update the Queue struct's redis field to hold a type that can
represent either connection (e.g. an enum wrapper or a boxed trait like
redis::aio::ConnectionManager) so cleanup_redis_keys and other call sites
(referencing Queue::new, redis::cluster::ClusterClient, ClusterConnection, and
any Queue::redis usages) can operate against both cluster and non-cluster
connections without panicking. Ensure the fallback preserves async query
compatibility for KEYS/DEL and that error handling logs the chosen backend.

---

Duplicate comments:
In `@twmq/src/lib.rs`:
- Around line 963-1010: The Lua trim_script (and the similar scripts used by
post_success_completion and post_fail_completion) currently hardcode
'twmq:{engine}:' into job_meta_hash and errors_list_name; update these scripts
to accept the engine prefix as an input (e.g., add an ARGV or KEYS parameter
like engine_prefix or engine_name) and concatenate that parameter instead of the
literal '{engine}' when building job_meta_hash and errors_list_name (locate
occurrences in trim_script, and in the
post_success_completion/post_fail_completion scripts, and replace the hardcoded
string with the passed-in parameter variable).

In `@twmq/tests/idempotency_modes.rs`:
- Around line 7-72: This comment duplicates the cluster-enabled Redis
requirement already documented in twmq/tests/basic.rs; remove the redundant note
from twmq/tests/idempotency_modes.rs (or replace it with a single-line
reference) so there's a single source of truth; look for references in this file
around the cleanup_redis_keys helper and the test setup in idempotency_modes.rs
and either delete the duplicate comment or add a concise pointer to the existing
documentation in basic.rs.

In `@twmq/tests/nack.rs`:
- Around line 13-31: The comment about requiring a cluster-capable Redis
endpoint is duplicated here; remove the redundant note in this test and either
reference the existing guidance in the central/basic test or move the single
authoritative note into the shared test fixtures (e.g., where REDIS_URL or
cleanup_redis_keys is defined) so tests like cleanup_redis_keys and constants
such as REDIS_URL rely on one documented cluster requirement location.

In `@twmq/tests/prune_race_random_ids.rs`:
- Around line 7-127: The comment about requiring a cluster-enabled Redis
endpoint is duplicated (already in twmq/tests/basic.rs); remove the redundant
note from this test file and either reference the existing comment or
consolidate the requirement into a single shared place; update or remove the
block containing REDIS_URL/ClusterConnection setup or the top-of-file comment so
only the authoritative cluster-enabled Redis requirement remains (look for
REDIS_URL, ClusterConnection, and cleanup_redis_keys in this file to locate the
duplicated guidance).

---

Nitpick comments:
In `@executors/src/eoa/store/mod.rs`:
- Around line 123-336: Many key-builder methods (eoa_lock_key_name,
transaction_data_key_name, transaction_attempts_list_name,
pending_transactions_zset_name, submitted_transactions_zset_name,
transaction_hash_to_id_key_name, borrowed_transactions_hashmap_name,
recycled_nonces_zset_name, optimistic_transaction_count_key_name,
last_transaction_count_key_name, eoa_health_key_name, manual_reset_key_name)
repeat the same namespace match/format logic; add a private helper fn
prefix(&self) -> String that returns either "{ns}:{ENGINE_HASH_TAG}" or
"{ENGINE_HASH_TAG}" and refactor each method to call this prefix and then append
the remaining suffix (e.g., ":eoa_executor:lock:{}:{}" or formatted
transaction-specific suffix), removing the duplicated match blocks.

In `@executors/src/external_bundler/deployment.rs`:
- Around line 85-90: The duplicate cache_key implementation in
RedisDeploymentLock::cache_key and RedisDeploymentCache::cache_key should be
extracted to a shared helper: add a free function like fn
deployment_cache_key(chain_id: u64, account_address: &Address) -> String that
returns the formatted key using twmq::ENGINE_HASH_TAG and the same
"{CACHE_PREFIX}" token, then have both RedisDeploymentLock::cache_key and
RedisDeploymentCache::cache_key call that helper (replace their bodies with a
call to deployment_cache_key) to avoid duplication and keep formatting
consistent.

In `@executors/src/transaction_registry.rs`:
- Around line 33-36: registry_key now embeds twmq::ENGINE_HASH_TAG (in function
registry_key using self.namespace), which will break reads of existing registry
entries; implement a migration strategy: update registry_key to perform a
dual-read during a transition window (try new key first then fallback to the old
key format without ENGINE_HASH_TAG) or add a one-time migration/backfill routine
that scans for old keys and re-writes them using the new key format (reference
registry_key, self.namespace, and twmq::ENGINE_HASH_TAG to locate where to
change read/write logic).

In `@server/src/main.rs`:
- Around line 78-85: Extract the repeated comma-split-trim-filter-collect logic
into a single helper (e.g. fn parse_redis_nodes(url: &str) -> Vec<String>) in a
shared crate/module (twmq or a config util), then replace the inline code in
main.rs (where initial_nodes is built) and the duplicates in
integration-tests/tests/setup.rs and twmq/src/multilane.rs to call
parse_redis_nodes(&config.redis.url). Make the helper return owned Strings
(Vec<String>) to avoid lifetime issues, and add simple validation inside it to
return an error or panic when the resulting node list is empty so callers like
twmq::redis::cluster::ClusterClient::new get a non-empty list.

In `@twmq/src/multilane.rs`:
- Around line 406-408: The Lua scripts (cancel_job, pop_batch_jobs,
post_success_completion, post_fail_completion) currently build keys like local
lane_pending_list / lane_delayed_zset / lane_active_hash via string
concatenation inside the script; change these scripts to accept all Redis keys
via KEYS[] and replace uses of the concatenated variables with KEYS[n] (and
update the caller that loads/calls these scripts to pass the correct key list in
order), ensuring each lane/queue key is provided in KEYS[] so no dynamic key
construction occurs in the script body and cluster proxies (e.g., ElastiCache)
accept the script.

In `@twmq/src/queue.rs`:
- Around line 191-203: The Redis URL handling now treats a URL string as a
comma-separated list of cluster node addresses (see RedisSource::Url parsing),
but the public builder method redis_url(S: Into<String>) lacks documentation
about this contract; update the doc comment on the redis_url method to state
that the parameter should be a comma-separated list of Redis node URLs (e.g.,
"redis://node1:6379,redis://node2:6380"), note that a single URL (no commas) is
supported, and mention any accepted URL formats and trimming behavior so callers
know how to supply cluster endpoints.

In `@twmq/tests/basic_hook.rs`:
- Around line 24-41: The test helper cleanup_redis_keys uses redis::cmd("KEYS")
which is not cluster-safe and is blocking; replace it with an async,
cluster-safe incremental scan using SCAN (or CLUSTER KEYSLOT+SCAN if needed)
over the connection so all nodes are covered and the command is non-blocking. In
practice in the cleanup_redis_keys function, iterate with redis SCAN on the
ClusterConnection using the keys_pattern (twmq::ENGINE_HASH_TAG + queue_name) to
collect matching keys in pages and then DEL them (or pipeline deletes) only
after gathering them; ensure the scan loop handles cursor=0 termination and uses
query_async on the same connection rather than KEYS so the helper works
correctly in multi-node setups and avoids blocking.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e570bc8 and 9ab4ec7.

📒 Files selected for processing (27)
  • Cargo.toml
  • executors/src/eoa/store/atomic.rs
  • executors/src/eoa/store/borrowed.rs
  • executors/src/eoa/store/mod.rs
  • executors/src/eoa/store/pending.rs
  • executors/src/eoa/store/submitted.rs
  • executors/src/eoa/worker/mod.rs
  • executors/src/external_bundler/deployment.rs
  • executors/src/solana_executor/storage.rs
  • executors/src/transaction_registry.rs
  • integration-tests/tests/setup.rs
  • server/Cargo.toml
  • server/src/execution_router/mod.rs
  • server/src/main.rs
  • server/src/queue/manager.rs
  • twmq/src/lib.rs
  • twmq/src/multilane.rs
  • twmq/src/queue.rs
  • twmq/tests/basic.rs
  • twmq/tests/basic_hook.rs
  • twmq/tests/delay.rs
  • twmq/tests/idempotency_modes.rs
  • twmq/tests/lease_expiry.rs
  • twmq/tests/multilane_batch_pop.rs
  • twmq/tests/nack.rs
  • twmq/tests/prune_race_condition.rs
  • twmq/tests/prune_race_random_ids.rs

Comment on lines +7 to +8
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same critical WATCH incompatibility as in multilane.rs — affects EOA lock safety.

The SafeRedisTransaction trait and its consumers (with_lock_check at Lines 220–261, execute_with_watch_and_retry at Lines 336–392) rely on WATCH/MULTI/EXEC for optimistic locking. As noted in the multilane.rs review, ClusterConnection does not support WATCH because it doesn't guarantee connection affinity.

This is especially critical here because it guards the EOA lock mechanism — concurrent workers could both believe they hold the lock, leading to double-spending or nonce collisions.

Also applies to: 45-51

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/atomic.rs` around lines 7 - 8, The code uses
ClusterConnection with WATCH/MULTI/EXEC via the SafeRedisTransaction trait, but
ClusterConnection doesn't support WATCH (connection-affinity issue) which breaks
the EOA lock safety; change the implementation to replace WATCH-based optimistic
transactions with an atomic Lua script (EVAL) or a Redis server-side CAS routine
for the lock flow. Specifically, update the SafeRedisTransaction consumers —
with_lock_check and execute_with_watch_and_retry — to call a single EVAL that
checks the lock owner and nonce and then sets/updates or aborts atomically (so
both the check and the write occur server-side), remove reliance on
WATCH/MULTI/EXEC, and stop using ClusterConnection for WATCH paths (keep
ClusterConnection but invoke EVAL via its async commands). Ensure the Lua script
returns clear success/failure codes that with_lock_check and
execute_with_watch_and_retry interpret to retry/fail as before.

Comment on lines 123 to 134
pub fn eoa_lock_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
Some(ns) => format!(
"{ns}:{}:eoa_executor:lock:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:lock:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Namespace containing { or } would break the hash-tag routing.

Redis Cluster computes the hash slot from the substring between the first { and the next } in the key. If namespace ever contains { or }, the effective hash tag would no longer be engine, causing keys to scatter across slots and breaking atomicity of pipelines and Lua scripts.

Consider validating the namespace at construction time (in EoaExecutorStoreKeys::new) or placing the hash tag before the namespace in the key format (e.g., {engine}:{ns}:eoa_executor:...).

Suggested defensive validation in the constructor
 impl EoaExecutorStoreKeys {
     pub fn new(eoa: Address, chain_id: u64, namespace: Option<String>) -> Self {
+        if let Some(ref ns) = namespace {
+            assert!(
+                !ns.contains('{') && !ns.contains('}'),
+                "Namespace must not contain '{{' or '}}' as it would break Redis Cluster hash tag routing"
+            );
+        }
         Self {
             eoa,
             chain_id,
             namespace,
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/mod.rs` around lines 123 - 134, The key construction
in eoa_lock_key_name() can have namespace containing '{' or '}', which would
change Redis Cluster hash-tag parsing and break slot affinity; update the logic
in EoaExecutorStoreKeys (e.g., in EoaExecutorStoreKeys::new) to either
validate/escape namespace (reject or strip '{' and '}') or move the
twmq::ENGINE_HASH_TAG hash tag to the front of the key format (e.g., wrap
ENGINE_HASH_TAG in braces before namespace) and then update eoa_lock_key_name()
and other key-formatting methods to use the safe/tag-first format; ensure any
validation error surfaces at construction time so no keys are created with
illegal characters.

Comment on lines +178 to +179
let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
.context("Failed to connect to Valkey Cluster")?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading error context — this is a config parse error, not a connection failure.

ClusterClient::new() only validates and parses the node URLs; it doesn't open a socket. Actual cluster connection happens at get_async_connection() (line 263). If an invalid URL is provided, the current message "Failed to connect to Valkey Cluster" would misdiagnose the problem.

🔧 Suggested fix
-        let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
-            .context("Failed to connect to Valkey Cluster")?;
+        let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
+            .context("Failed to build Valkey Cluster client (invalid node URL?)")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
.context("Failed to connect to Valkey Cluster")?;
let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
.context("Failed to build Valkey Cluster client (invalid node URL?)")?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@integration-tests/tests/setup.rs` around lines 178 - 179, The error context
on ClusterClient::new is misleading — it parses/validates node URLs rather than
opening a connection; update the context string for
twmq::redis::cluster::ClusterClient::new(initial_nodes) to indicate a config/URL
parse/validation failure (e.g., "Failed to parse/validate Valkey cluster node
URLs") and leave connection errors to be reported when calling
get_async_connection(); ensure any related error messages reference
parsing/validation rather than connection attempts for clarity.

Comment on lines +134 to +135
let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap();
let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Directly causes CI pipeline failures — ClusterClient fails against non-cluster Redis.

Both .unwrap() calls panic because get_async_connection() executes CLUSTER NODES handshake against the CI Redis instance, which has cluster-enabled no, yielding ResponseError: This instance has cluster support disabled. This is the exact error in the pipeline failures at lines 135 and 311.

The same root issue affects all other twmq tests (Queue::new, MultilaneQueue::new) that internally construct a ClusterClient — those surface as .expect("…") panics rather than .unwrap() panics, so they appear differently in CI output.

Two remediation paths:

  1. Enable single-shard cluster mode in CI — add cluster-enabled yes + cluster-node-timeout 5000 + cluster-config-file nodes.conf to the Redis config used in the test workflow. A single-node cluster is fully functional and sufficient for tests.

  2. Reuse the queue's existing ClusterConnection for direct Redis access, eliminating the duplicate ClusterClient construction entirely:

♻️ Option 2 — reuse queue's connection
-    // Create Redis connection for the execution context
-    let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap();
-    let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap());
+    // Reuse the connection the queue already holds
+    let redis_conn = queue.redis.clone();

Then update the later uses:

-    cleanup_redis_keys(&redis_conn, &queue_name).await;
+    cleanup_redis_keys(&queue.redis, &queue_name).await;

-    let mut redis_conn_direct = redis_conn.as_ref().clone();
+    let mut redis_conn_direct = queue.redis.clone();

Additionally, the Arc<ClusterConnection> wrapper is unnecessary: ClusterConnection is already Clone via its internal shared pool, so Arc<> adds reference-counting overhead for no benefit.

Also applies to: 310-311

🧰 Tools
🪛 GitHub Actions: twmq Tests

[error] 135-135: Called Result::unwrap() on an Err value: An error was signalled by the server - ResponseError: This instance has cluster support disabled

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/delay.rs` around lines 134 - 135, Tests currently construct a
redis::cluster::ClusterClient and call get_async_connection().unwrap(), which
panics against a non-cluster Redis; instead either enable single-node cluster
mode in CI (set cluster-enabled yes, cluster-node-timeout 5000,
cluster-config-file nodes.conf) or change the tests to reuse the queue's
existing ClusterConnection (avoid creating a new ClusterClient in tests and use
the Queue/MultilaneQueue provided ClusterConnection instance), remove
unnecessary Arc<ClusterConnection> wrappers since ClusterConnection is Clone,
and replace unwrap()/expect() calls around ClusterClient/get_async_connection
with proper error handling when acquiring the reused connection.

async fn cleanup(&self) {
let mut conn = self.queue.redis.clone();
let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id);
let keys_pattern = format!("twmq_multilane:{}:{}:*", twmq::ENGINE_HASH_TAG, self.queue_id);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

KEYS in cluster mode only scans a single node — cleanup may silently miss keys in a multi-node cluster.

The KEYS command in a cluster environment returns only the keys served by the node the request is routed to, not all keys across the cluster. Since ENGINE_HASH_TAG is used as a hash tag to co-locate all related keys on the same slot, cleanup will work if redis-rs routes the command to the correct node — but KEYS <pattern> has no key argument for deterministic slot routing, so it may land on the wrong node in a multi-node cluster.

The redis-rs docs explicitly note that for commands like SCAN that need per-node execution, you must route them explicitly to each node using RoutingInfo. The same applies here.

Consider replacing KEYS + DEL with a SCAN-based iteration using RoutingInfo::SingleNode targeting the hash tag's slot, or at minimum accept that cleanup is best-effort and add a comment.

Additionally, MultilaneQueue::new(REDIS_URL, …) internally creates a ClusterClient, so these tests require a cluster-mode Redis — the same root cause as the pipeline failures in delay.rs.

Also applies to: 168-169

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@twmq/tests/multilane_batch_pop.rs` at line 93, The test currently builds
keys_pattern using ENGINE_HASH_TAG and self.queue_id and then uses KEYS + DEL
which in cluster mode only hits a single node; update the cleanup to iterate
with SCAN (per-node) routed to the proper slot by using RoutingInfo::SingleNode
for the hash-tagged slot so all nodes are scanned and matching keys are deleted,
or if you prefer a minimal change add a clear comment in the test near the
keys_pattern/KEYS usage noting that cleanup is best-effort in cluster mode and
that MultilaneQueue::new(...) creates a ClusterClient so the test requires a
cluster Redis instance; reference keys_pattern, ENGINE_HASH_TAG, self.queue_id,
MultilaneQueue::new, RoutingInfo::SingleNode, SCAN, and avoid using KEYS + DEL
in cluster tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant