Conversation
WalkthroughThis pull request migrates the Redis infrastructure from single-node ConnectionManager to cluster-based ClusterConnection, adds cluster-aware key tagging via ENGINE_HASH_TAG, extends Redis features to support clustering, and updates all queue, executor, and transaction registry implementations to use the cluster client with appropriately namespaced keys. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant QueueManager
participant ClusterClient
participant RedisCluster
Client->>QueueManager: initialize with Redis URL
QueueManager->>ClusterClient: parse comma-separated nodes<br/>create ClusterClient
QueueManager->>RedisCluster: get_async_connection()
RedisCluster-->>QueueManager: ClusterConnection
QueueManager->>QueueManager: build Queue/Registry<br/>with ClusterConnection
Client->>QueueManager: push/pop job
QueueManager->>RedisCluster: execute command on {engine}:queue:key
RedisCluster-->>QueueManager: result
QueueManager-->>Client: job/result
sequenceDiagram
participant EOAExecutor
participant SafeRedisTransaction
participant ClusterConnection
participant RedisCluster
EOAExecutor->>SafeRedisTransaction: initiate transaction
SafeRedisTransaction->>SafeRedisTransaction: validation(conn: ClusterConnection)
SafeRedisTransaction->>ClusterConnection: query with {engine}:eoa:key
ClusterConnection->>RedisCluster: execute cluster command
RedisCluster-->>ClusterConnection: data
ClusterConnection-->>SafeRedisTransaction: validation result
SafeRedisTransaction->>SafeRedisTransaction: execute(conn, validation_data)
SafeRedisTransaction->>ClusterConnection: write operations
ClusterConnection->>RedisCluster: pipeline/multi commands
RedisCluster-->>ClusterConnection: confirmation
ClusterConnection-->>SafeRedisTransaction: result
SafeRedisTransaction-->>EOAExecutor: transaction complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
executors/src/external_bundler/deployment.rs (2)
179-185:⚠️ Potential issue | 🟠 MajorLock has no TTL — risk of permanent deadlock on crash.
The comment says "SET NX EX" but only
set_nxis used — no expiry is set on the lock key. If the lock holder crashes before callingrelease_lock, this key persists forever and no other process can acquire it.Consider using
SET key value NX EX <ttl>via a raw command or theset_optionsAPI to atomically set the lock with a TTL:Proposed fix sketch
- // Use SET NX EX for atomic acquire - let result: Option<String> = - conn.set_nx(&key, &lock_data_str) - .await - .map_err(|e| EngineError::InternalError { - message: format!("Lock acquire failed: {e}"), - })?; + // Atomic SET NX EX: acquire lock with a 5-minute safety TTL + let result: Option<String> = redis::cmd("SET") + .arg(&key) + .arg(&lock_data_str) + .arg("NX") + .arg("EX") + .arg(300u64) + .query_async(&mut conn) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Lock acquire failed: {e}"), + })?;Note: when using
SET ... NX, the reply is either"OK"(Some) or nil (None), soOption<String>is the correct return type — which also fixes theset_nxreturn-type issue flagged above.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executors/src/external_bundler/deployment.rs` around lines 179 - 185, The lock acquisition currently uses conn.set_nx(&key, &lock_data_str) without an expiry, risking permanent deadlock; change the acquire path to perform an atomic SET with NX and EX (e.g., use conn.set_options or a raw SET command) to set a TTL when creating the lock key (still returning Option<String> as the SET reply is "OK" or nil). Ensure the same key and lock_data_str are used and keep release_lock behavior unchanged so the lock is removed when finished; pick an appropriate ttl constant and document it near the acquire logic.
179-205:⚠️ Potential issue | 🔴 CriticalCritical:
set_nxwithOption<String>return type makes the lock non-exclusive, and no TTL is set on acquisition.The
SETNXcommand returns an integer (1 = set, 0 = not set). Decoding this asOption<String>yieldsSome("1")orSome("0")— neverNone— because theString::FromRedisValueimplementation converts integers to their string representation. Consequently, theSome(_)arm at line 188 always matches, and every call returnsAcquireLockResult::Acquiredregardless of whether the lock already exists. Additionally, the comment at line 179 states "Use SET NX EX for atomic acquire," but the code uses bareset_nx()without any TTL; if a process crashes after acquiring the lock, it will be held indefinitely.Fix: Change the return type to
booland add an expiration time:Proposed fix
- // Use SET NX EX for atomic acquire - let result: Option<String> = - conn.set_nx(&key, &lock_data_str) - .await - .map_err(|e| EngineError::InternalError { - message: format!("Lock acquire failed: {e}"), - })?; - - match result { - Some(_) => Ok(AcquireLockResult::Acquired), - None => { + // Use SET NX EX for atomic acquire with expiration + let was_set: bool = + conn.set_nx(&key, &lock_data_str) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Lock acquire failed: {e}"), + })?; + + if was_set { + // Set lock expiration to prevent indefinite locks on crashes + conn.expire(&key, 3600) // 1 hour TTL + .await + .map_err(|e| EngineError::InternalError { + message: format!("Failed to set lock expiration: {e}"), + })?; + Ok(AcquireLockResult::Acquired) + } else {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executors/src/external_bundler/deployment.rs` around lines 179 - 205, The redis SETNX usage is wrong: conn.set_nx(&key, &lock_data_str) is being decoded as Option<String> so it always hits Some(_) and always returns AcquireLockResult::Acquired and it also never sets a TTL; change the call to use a boolean-returning set with an expiration (atomic SET ... NX EX) so acquisition succeeds only when the key was created and the key has a TTL; specifically replace the conn.set_nx(...) call with an atomic set that returns a bool (or use the client API that maps to Redis "SET" with NX and EX) and then interpret true => AcquireLockResult::Acquired, false => fetch existing value (same existing_data path) and return AcquireLockResult::AlreadyLocked(existing_lock_id); ensure you pass the TTL (expiration seconds) when writing lock_data_str and keep the existing get/serde_json logic for the failure branch.twmq/tests/basic_hook.rs (1)
68-68:⚠️ Potential issue | 🔴 CriticalPipeline failure: tests fail because
REDIS_URLpoints to a non-cluster Redis instance.The pipeline error confirms this:
"This instance has cluster support disabled"
The
ClusterClientrequires connecting to a Redis instance running in cluster mode. The hardcodedredis://127.0.0.1:6379/points to a standalone Redis, which rejects cluster commands.Options:
- Use a Redis Cluster in CI (e.g., via
docker-composewith cluster-enabled nodes or a test container).- Keep a non-cluster code path for tests (e.g., feature-gated).
- Use a single-node Redis Cluster setup (possible with
cluster-enabled yesand appropriate config).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/basic_hook.rs` at line 68, The test fails because the hardcoded REDIS_URL in tests/basic_hook.rs points at a standalone Redis while ClusterClient requires cluster mode; update the test to either (A) point REDIS_URL to a cluster-enabled instance (e.g., single-node cluster endpoint your CI provides) so ClusterClient can connect, or (B) add a test branch/feature gate that uses the non-cluster client path instead of ClusterClient when cluster support is unavailable. Locate and modify the constant REDIS_URL and the code that constructs/uses ClusterClient in the test so it either reads a CI-provided cluster URL or conditionally falls back to the non-cluster client.twmq/src/multilane.rs (1)
1296-1347:⚠️ Potential issue | 🔴 CriticalCritical:
WATCH/MULTI/EXECis incompatible withClusterConnection.
ClusterConnectionin redis-rs does not supportWATCHand does not guarantee connection affinity — each.query_async()call may be dispatched to a different underlying TCP connection in the cluster pool. TheWATCH→ read →MULTI/EXECpattern (lines 1302–1322) requires all commands to execute on the same connection. When commands are routed to different connections, the WATCH state is silently lost and optimistic locking provides no protection.This pattern appears in:
complete_job(lines 1296–1347)complete_job_queue_error(lines 1405+)executors/src/eoa/store/atomic.rs(multiple locations)Consequence: Lease-based protection for job completion is effectively disabled, enabling duplicate completions and data corruption under concurrent access.
Fix options:
- Replace
WATCH/MULTI/EXECwith a Lua script (EVAL/EVALSHA) to atomically check the lease and perform operations.- Use a direct (non-cluster) Redis connection for transaction sequences, if feasible in your architecture.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/multilane.rs` around lines 1296 - 1347, The WATCH/MULTI/EXEC sequence in complete_job (and similarly in complete_job_queue_error and locations in executors/src/eoa/store/atomic.rs) is unsafe with ClusterConnection; replace the optimistic-transaction flow that uses lease_key_name, hook_pipeline, and the subsequent post_*_completion calls with a single EVAL/EVALSHA Lua script that (1) atomically checks the lease key and lease token, (2) applies the same Redis mutations encoded by hook_pipeline, and (3) returns a status so the Rust side can call post_success_completion/post_nack_completion/post_fail_completion only after the script confirms success; implement script loading (EVALSHA fallback to EVAL) and reuse the same script for all these places to ensure atomicity across the cluster.twmq/tests/basic.rs (1)
14-70:⚠️ Potential issue | 🟠 MajorQueue::new() requires cluster-capable Redis; CI will fail with standard Redis
The
Queue::new()implementation usesredis::cluster::ClusterClient::new(), which requires a cluster-enabled Redis or Valkey instance. The CI workflow currently runs standardredis:7-alpine(non-cluster), which cannot satisfy this requirement and will fail with "cluster support disabled" errors. Either:
- Update CI to run a cluster-capable Redis/Valkey service, or
- Modify Queue::new() to support both cluster and non-cluster Redis connections
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/basic.rs` around lines 14 - 70, Queue::new currently always uses redis::cluster::ClusterClient which fails on non-cluster Redis in CI; change Queue::new to try creating a cluster connection first and on failure fall back to a standard redis::Client connection (or detect the URL and choose accordingly), and update the Queue struct's redis field to hold a type that can represent either connection (e.g. an enum wrapper or a boxed trait like redis::aio::ConnectionManager) so cleanup_redis_keys and other call sites (referencing Queue::new, redis::cluster::ClusterClient, ClusterConnection, and any Queue::redis usages) can operate against both cluster and non-cluster connections without panicking. Ensure the fallback preserves async query compatibility for KEYS/DEL and that error handling logs the chosen backend.twmq/src/lib.rs (2)
1256-1308:⚠️ Potential issue | 🔴 Critical
WATCH/MULTI/EXECis not supported onClusterConnection— this will fail at runtime.The
redis-rs0.31.0 crate'sClusterConnectionexplicitly does not support Redis transactions. The cluster connection type reportssupports_pipelining() -> falseand its pipeline variant is documented as "does not support transactions." Lines 1262–1266 issue a rawWATCHcommand, then lines 1280–1286 execute an atomic pipeline (MULTI/EXEC), which will error at runtime on a cluster connection.This same pattern appears in
complete_job_queue_errorat lines 1369–1372 and 1386–1387.Since all keys share the
{engine}hash slot, consider:
- Using a dedicated single-node connection to the specific cluster node owning that slot, rather than
ClusterConnection.- Using a Lua script to achieve atomic check-and-update (Lua scripts are atomic on a single node).
- Using Redlock or another distributed lock mechanism instead of WATCH-based optimistic concurrency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/lib.rs` around lines 1256 - 1308, The code uses WATCH/MULTI/EXEC against a ClusterConnection (see usage of lease_key, cloned redis connection, and atomic_pipeline.query_async) which is unsupported and will fail at runtime; replace the WATCH-based transaction in this completion loop (and the similar pattern in complete_job_queue_error) with an atomic alternative: either acquire a single-node connection for the {engine} slot and run the existing WATCH/MULTI/EXEC there, or (preferred) convert the pipeline + lease-exists check into a single EVAL/EVALSHA Lua script that checks existence of lease_key and performs the same updates atomically, then call the script (using the same redis client interface) instead of atomic_pipeline.query_async; ensure you reference lease_key, hook_pipeline operations, and the post-completion branches (post_success_completion, post_nack_completion, post_fail_completion) so the Lua script performs the exact writes previously in the pipeline or the single-node connection targets the node owning the {engine} slot.
603-752:⚠️ Potential issue | 🟠 MajorLua scripts dynamically construct keys not declared in
KEYS[]— cluster compliance risk.The
pop_batch_jobsLua script (and similarly the push, trim scripts) constructs keys dynamically within Lua:local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_tokenThese keys are not passed via
KEYS[]. The Redis Cluster specification states that all keys accessed by a script must be provided asKEYSarguments so the cluster can verify they belong to the same hash slot. While this works today because all keys share{engine}, some Valkey/Redis configurations enforce strict key checking (allow-cross-slot-keys noor similar), and proxy layers (e.g., Envoy, Twemproxy) may reject such scripts.Additionally, the string
{engine}is hardcoded in the Lua rather than derived from theENGINE_HASH_TAGconstant, creating a maintenance coupling. If the constant ever changes, these scripts will silently use a different hash slot.Valkey cluster lua script KEYS undeclared key access🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/lib.rs` around lines 603 - 752, The Lua script pop_batch_jobs constructs keys like job_meta_hash_name and lease_key inside the script (and similarly in other scripts) instead of receiving them via KEYS[], which violates Redis Cluster key checking and hardcodes '{engine}'; fix by changing the code that registers/executed the redis::Script for pop_batch_jobs (and the push/trim scripts) to pass every Redis key the script touches through KEYS[] (e.g., delayed_zset_name, pending_list_name, active_hash_name, job_data_hash_name, pending_cancellation_set, failed_list_name, success_list_name plus per-job meta and lease keys for popped job ids) and update the Lua body to reference those KEYS[] entries rather than constructing strings; also replace the literal '{engine}' with the ENGINE_HASH_TAG constant value when building the KEYS[] array so the script uses the same hash tag as the rest of the code.
♻️ Duplicate comments (4)
twmq/tests/idempotency_modes.rs (1)
7-72: Duplicate: cluster-enabled Redis requirement
Covered by the comment intwmq/tests/basic.rsabout needing a cluster-capable Redis endpoint for tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/idempotency_modes.rs` around lines 7 - 72, This comment duplicates the cluster-enabled Redis requirement already documented in twmq/tests/basic.rs; remove the redundant note from twmq/tests/idempotency_modes.rs (or replace it with a single-line reference) so there's a single source of truth; look for references in this file around the cleanup_redis_keys helper and the test setup in idempotency_modes.rs and either delete the duplicate comment or add a concise pointer to the existing documentation in basic.rs.twmq/tests/prune_race_random_ids.rs (1)
7-127: Duplicate: cluster-enabled Redis requirement
Covered by the comment intwmq/tests/basic.rsabout needing a cluster-capable Redis endpoint for tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/prune_race_random_ids.rs` around lines 7 - 127, The comment about requiring a cluster-enabled Redis endpoint is duplicated (already in twmq/tests/basic.rs); remove the redundant note from this test file and either reference the existing comment or consolidate the requirement into a single shared place; update or remove the block containing REDIS_URL/ClusterConnection setup or the top-of-file comment so only the authoritative cluster-enabled Redis requirement remains (look for REDIS_URL, ClusterConnection, and cleanup_redis_keys in this file to locate the duplicated guidance).twmq/tests/nack.rs (1)
13-31: Duplicate: cluster-enabled Redis requirement
Covered by the comment intwmq/tests/basic.rsabout needing a cluster-capable Redis endpoint for tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/nack.rs` around lines 13 - 31, The comment about requiring a cluster-capable Redis endpoint is duplicated here; remove the redundant note in this test and either reference the existing guidance in the central/basic test or move the single authoritative note into the shared test fixtures (e.g., where REDIS_URL or cleanup_redis_keys is defined) so tests like cleanup_redis_keys and constants such as REDIS_URL rely on one documented cluster requirement location.twmq/src/lib.rs (1)
963-1010: Hardcoded{engine}in trim Lua scripts — same issue aspop_batch_jobs.Both
post_success_completionandpost_fail_completiontrim scripts constructjob_meta_hashanderrors_list_namewith hardcoded'twmq:{engine}:'strings. Same cluster compliance and maintenance concerns as flagged in thepop_batch_jobsscript.Also applies to: 1144-1200
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/lib.rs` around lines 963 - 1010, The Lua trim_script (and the similar scripts used by post_success_completion and post_fail_completion) currently hardcode 'twmq:{engine}:' into job_meta_hash and errors_list_name; update these scripts to accept the engine prefix as an input (e.g., add an ARGV or KEYS parameter like engine_prefix or engine_name) and concatenate that parameter instead of the literal '{engine}' when building job_meta_hash and errors_list_name (locate occurrences in trim_script, and in the post_success_completion/post_fail_completion scripts, and replace the hardcoded string with the passed-in parameter variable).
🧹 Nitpick comments (7)
executors/src/external_bundler/deployment.rs (1)
85-90: Duplicatecache_keyimplementation acrossRedisDeploymentCacheandRedisDeploymentLock.This method is identical to
RedisDeploymentCache::cache_key(line 46). Consider extracting a shared helper (e.g., a free functionfn deployment_cache_key(chain_id: u64, account_address: &Address) -> String) to avoid divergence.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executors/src/external_bundler/deployment.rs` around lines 85 - 90, The duplicate cache_key implementation in RedisDeploymentLock::cache_key and RedisDeploymentCache::cache_key should be extracted to a shared helper: add a free function like fn deployment_cache_key(chain_id: u64, account_address: &Address) -> String that returns the formatted key using twmq::ENGINE_HASH_TAG and the same "{CACHE_PREFIX}" token, then have both RedisDeploymentLock::cache_key and RedisDeploymentCache::cache_key call that helper (replace their bodies with a call to deployment_cache_key) to avoid duplication and keep formatting consistent.twmq/src/queue.rs (1)
191-203: URL parsing assumes comma-separated cluster nodes — document this contract.The
redis_urlparameter now expects a comma-separated list of node addresses (e.g.,redis://node1:6379,redis://node2:6380), but theredis_urlmethod on Line 107 accepts a genericS: Into<String>with no doc hint. A single-node URL still works (no commas → one node), but callers won't know this changed.Consider adding a brief doc comment on
redis_urlto clarify the expected format.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/queue.rs` around lines 191 - 203, The Redis URL handling now treats a URL string as a comma-separated list of cluster node addresses (see RedisSource::Url parsing), but the public builder method redis_url(S: Into<String>) lacks documentation about this contract; update the doc comment on the redis_url method to state that the parameter should be a comma-separated list of Redis node URLs (e.g., "redis://node1:6379,redis://node2:6380"), note that a single URL (no commas) is supported, and mention any accepted URL formats and trimming behavior so callers know how to supply cluster endpoints.twmq/src/multilane.rs (1)
406-408: Lua scripts construct keys dynamically instead of declaring them in KEYS[] — works due to hash tag but is fragile.Multiple Lua scripts (e.g.,
cancel_job,pop_batch_jobs,post_success_completion,post_fail_completion) build Redis keys via string concatenation inside the script body (e.g., Line 406:'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending'). These dynamically constructed keys are not passed viaKEYS[].This works because all keys share the
{engine}hash tag and thus map to the same cluster slot. However, some Redis Cluster proxies and managed services (e.g., AWS ElastiCache in cluster mode) enforce that all accessed keys must be declared in KEYS[] and will reject scripts that access undeclared keys.If you only target Valkey/Redis OSS clusters, this is acceptable. If managed services are in scope, consider passing all keys via
KEYS[]or restructuring to avoid dynamic key construction in Lua.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/src/multilane.rs` around lines 406 - 408, The Lua scripts (cancel_job, pop_batch_jobs, post_success_completion, post_fail_completion) currently build keys like local lane_pending_list / lane_delayed_zset / lane_active_hash via string concatenation inside the script; change these scripts to accept all Redis keys via KEYS[] and replace uses of the concatenated variables with KEYS[n] (and update the caller that loads/calls these scripts to pass the correct key list in order), ensuring each lane/queue key is provided in KEYS[] so no dynamic key construction occurs in the script body and cluster proxies (e.g., ElastiCache) accept the script.twmq/tests/basic_hook.rs (1)
24-41:KEYScommand is not cluster-safe for multi-node setups.
redis::cmd("KEYS")only scans the node the command is routed to, not the entire cluster. In a multi-node cluster, this cleanup will miss keys on other nodes. Since all keys share the{engine}hash tag (same slot → same node), this happens to work, but it's a subtle assumption worth documenting.Also,
KEYSis O(N) over the entire keyspace and blocks the node —SCANis preferred even in tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@twmq/tests/basic_hook.rs` around lines 24 - 41, The test helper cleanup_redis_keys uses redis::cmd("KEYS") which is not cluster-safe and is blocking; replace it with an async, cluster-safe incremental scan using SCAN (or CLUSTER KEYSLOT+SCAN if needed) over the connection so all nodes are covered and the command is non-blocking. In practice in the cleanup_redis_keys function, iterate with redis SCAN on the ClusterConnection using the keys_pattern (twmq::ENGINE_HASH_TAG + queue_name) to collect matching keys in pages and then DEL them (or pipeline deletes) only after gathering them; ensure the scan loop handles cursor=0 termination and uses query_async on the same connection rather than KEYS so the helper works correctly in multi-node setups and avoids blocking.server/src/main.rs (1)
78-85: Duplicate node-parsing logic — consider extracting a shared helper.The same comma-split-trim-filter-collect pattern for building
initial_nodesappears verbatim inintegration-tests/tests/setup.rs(lines 171–177) and intwmq/src/multilane.rs(lines 48–57). Extracting it once — e.g., as a free functionfn parse_redis_nodes(url: &str) -> Vec<&str>intwmqor a shared config module — reduces drift and makes it easy to add validation (e.g., reject an empty node list).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/src/main.rs` around lines 78 - 85, Extract the repeated comma-split-trim-filter-collect logic into a single helper (e.g. fn parse_redis_nodes(url: &str) -> Vec<String>) in a shared crate/module (twmq or a config util), then replace the inline code in main.rs (where initial_nodes is built) and the duplicates in integration-tests/tests/setup.rs and twmq/src/multilane.rs to call parse_redis_nodes(&config.redis.url). Make the helper return owned Strings (Vec<String>) to avoid lifetime issues, and add simple validation inside it to return an error or panic when the resulting node list is empty so callers like twmq::redis::cluster::ClusterClient::new get a non-empty list.executors/src/transaction_registry.rs (1)
33-36: Plan migration for the new registry key format
The registry key now includesENGINE_HASH_TAG, so existing entries under the prior key will no longer be read. If you need continuity during upgrades, consider a short dual-read window or a migration/backfill step.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executors/src/transaction_registry.rs` around lines 33 - 36, registry_key now embeds twmq::ENGINE_HASH_TAG (in function registry_key using self.namespace), which will break reads of existing registry entries; implement a migration strategy: update registry_key to perform a dual-read during a transition window (try new key first then fallback to the old key format without ENGINE_HASH_TAG) or add a one-time migration/backfill routine that scans for old keys and re-writes them using the new key format (reference registry_key, self.namespace, and twmq::ENGINE_HASH_TAG to locate where to change read/write logic).executors/src/eoa/store/mod.rs (1)
123-336: Consider a helper to reduce repetition across 12 key-builder methods.Every method follows the identical
match &self.namespace { Some(ns) => format!("{ns}:{}:...", twmq::ENGINE_HASH_TAG, ...), None => format!("{}:...", twmq::ENGINE_HASH_TAG, ...) }pattern. A single private helper likefn prefix(&self) -> Stringwould collapse the duplication.Example helper
+ fn key_prefix(&self) -> String { + match &self.namespace { + Some(ns) => format!("{ns}:{}", twmq::ENGINE_HASH_TAG), + None => twmq::ENGINE_HASH_TAG.to_string(), + } + } + pub fn eoa_lock_key_name(&self) -> String { - match &self.namespace { - Some(ns) => format!( - "{ns}:{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - None => format!( - "{}:eoa_executor:lock:{}:{}", - twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa - ), - } + format!("{}:eoa_executor:lock:{}:{}", self.key_prefix(), self.chain_id, self.eoa) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executors/src/eoa/store/mod.rs` around lines 123 - 336, Many key-builder methods (eoa_lock_key_name, transaction_data_key_name, transaction_attempts_list_name, pending_transactions_zset_name, submitted_transactions_zset_name, transaction_hash_to_id_key_name, borrowed_transactions_hashmap_name, recycled_nonces_zset_name, optimistic_transaction_count_key_name, last_transaction_count_key_name, eoa_health_key_name, manual_reset_key_name) repeat the same namespace match/format logic; add a private helper fn prefix(&self) -> String that returns either "{ns}:{ENGINE_HASH_TAG}" or "{ENGINE_HASH_TAG}" and refactor each method to call this prefix and then append the remaining suffix (e.g., ":eoa_executor:lock:{}:{}" or formatted transaction-specific suffix), removing the duplicated match blocks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@executors/src/eoa/store/atomic.rs`:
- Around line 7-8: The code uses ClusterConnection with WATCH/MULTI/EXEC via the
SafeRedisTransaction trait, but ClusterConnection doesn't support WATCH
(connection-affinity issue) which breaks the EOA lock safety; change the
implementation to replace WATCH-based optimistic transactions with an atomic Lua
script (EVAL) or a Redis server-side CAS routine for the lock flow.
Specifically, update the SafeRedisTransaction consumers — with_lock_check and
execute_with_watch_and_retry — to call a single EVAL that checks the lock owner
and nonce and then sets/updates or aborts atomically (so both the check and the
write occur server-side), remove reliance on WATCH/MULTI/EXEC, and stop using
ClusterConnection for WATCH paths (keep ClusterConnection but invoke EVAL via
its async commands). Ensure the Lua script returns clear success/failure codes
that with_lock_check and execute_with_watch_and_retry interpret to retry/fail as
before.
In `@executors/src/eoa/store/mod.rs`:
- Around line 123-134: The key construction in eoa_lock_key_name() can have
namespace containing '{' or '}', which would change Redis Cluster hash-tag
parsing and break slot affinity; update the logic in EoaExecutorStoreKeys (e.g.,
in EoaExecutorStoreKeys::new) to either validate/escape namespace (reject or
strip '{' and '}') or move the twmq::ENGINE_HASH_TAG hash tag to the front of
the key format (e.g., wrap ENGINE_HASH_TAG in braces before namespace) and then
update eoa_lock_key_name() and other key-formatting methods to use the
safe/tag-first format; ensure any validation error surfaces at construction time
so no keys are created with illegal characters.
In `@integration-tests/tests/setup.rs`:
- Around line 178-179: The error context on ClusterClient::new is misleading —
it parses/validates node URLs rather than opening a connection; update the
context string for twmq::redis::cluster::ClusterClient::new(initial_nodes) to
indicate a config/URL parse/validation failure (e.g., "Failed to parse/validate
Valkey cluster node URLs") and leave connection errors to be reported when
calling get_async_connection(); ensure any related error messages reference
parsing/validation rather than connection attempts for clarity.
In `@twmq/tests/delay.rs`:
- Around line 134-135: Tests currently construct a redis::cluster::ClusterClient
and call get_async_connection().unwrap(), which panics against a non-cluster
Redis; instead either enable single-node cluster mode in CI (set cluster-enabled
yes, cluster-node-timeout 5000, cluster-config-file nodes.conf) or change the
tests to reuse the queue's existing ClusterConnection (avoid creating a new
ClusterClient in tests and use the Queue/MultilaneQueue provided
ClusterConnection instance), remove unnecessary Arc<ClusterConnection> wrappers
since ClusterConnection is Clone, and replace unwrap()/expect() calls around
ClusterClient/get_async_connection with proper error handling when acquiring the
reused connection.
In `@twmq/tests/multilane_batch_pop.rs`:
- Line 93: The test currently builds keys_pattern using ENGINE_HASH_TAG and
self.queue_id and then uses KEYS + DEL which in cluster mode only hits a single
node; update the cleanup to iterate with SCAN (per-node) routed to the proper
slot by using RoutingInfo::SingleNode for the hash-tagged slot so all nodes are
scanned and matching keys are deleted, or if you prefer a minimal change add a
clear comment in the test near the keys_pattern/KEYS usage noting that cleanup
is best-effort in cluster mode and that MultilaneQueue::new(...) creates a
ClusterClient so the test requires a cluster Redis instance; reference
keys_pattern, ENGINE_HASH_TAG, self.queue_id, MultilaneQueue::new,
RoutingInfo::SingleNode, SCAN, and avoid using KEYS + DEL in cluster tests.
---
Outside diff comments:
In `@executors/src/external_bundler/deployment.rs`:
- Around line 179-185: The lock acquisition currently uses conn.set_nx(&key,
&lock_data_str) without an expiry, risking permanent deadlock; change the
acquire path to perform an atomic SET with NX and EX (e.g., use conn.set_options
or a raw SET command) to set a TTL when creating the lock key (still returning
Option<String> as the SET reply is "OK" or nil). Ensure the same key and
lock_data_str are used and keep release_lock behavior unchanged so the lock is
removed when finished; pick an appropriate ttl constant and document it near the
acquire logic.
- Around line 179-205: The redis SETNX usage is wrong: conn.set_nx(&key,
&lock_data_str) is being decoded as Option<String> so it always hits Some(_) and
always returns AcquireLockResult::Acquired and it also never sets a TTL; change
the call to use a boolean-returning set with an expiration (atomic SET ... NX
EX) so acquisition succeeds only when the key was created and the key has a TTL;
specifically replace the conn.set_nx(...) call with an atomic set that returns a
bool (or use the client API that maps to Redis "SET" with NX and EX) and then
interpret true => AcquireLockResult::Acquired, false => fetch existing value
(same existing_data path) and return
AcquireLockResult::AlreadyLocked(existing_lock_id); ensure you pass the TTL
(expiration seconds) when writing lock_data_str and keep the existing
get/serde_json logic for the failure branch.
In `@twmq/src/lib.rs`:
- Around line 1256-1308: The code uses WATCH/MULTI/EXEC against a
ClusterConnection (see usage of lease_key, cloned redis connection, and
atomic_pipeline.query_async) which is unsupported and will fail at runtime;
replace the WATCH-based transaction in this completion loop (and the similar
pattern in complete_job_queue_error) with an atomic alternative: either acquire
a single-node connection for the {engine} slot and run the existing
WATCH/MULTI/EXEC there, or (preferred) convert the pipeline + lease-exists check
into a single EVAL/EVALSHA Lua script that checks existence of lease_key and
performs the same updates atomically, then call the script (using the same redis
client interface) instead of atomic_pipeline.query_async; ensure you reference
lease_key, hook_pipeline operations, and the post-completion branches
(post_success_completion, post_nack_completion, post_fail_completion) so the Lua
script performs the exact writes previously in the pipeline or the single-node
connection targets the node owning the {engine} slot.
- Around line 603-752: The Lua script pop_batch_jobs constructs keys like
job_meta_hash_name and lease_key inside the script (and similarly in other
scripts) instead of receiving them via KEYS[], which violates Redis Cluster key
checking and hardcodes '{engine}'; fix by changing the code that
registers/executed the redis::Script for pop_batch_jobs (and the push/trim
scripts) to pass every Redis key the script touches through KEYS[] (e.g.,
delayed_zset_name, pending_list_name, active_hash_name, job_data_hash_name,
pending_cancellation_set, failed_list_name, success_list_name plus per-job meta
and lease keys for popped job ids) and update the Lua body to reference those
KEYS[] entries rather than constructing strings; also replace the literal
'{engine}' with the ENGINE_HASH_TAG constant value when building the KEYS[]
array so the script uses the same hash tag as the rest of the code.
In `@twmq/src/multilane.rs`:
- Around line 1296-1347: The WATCH/MULTI/EXEC sequence in complete_job (and
similarly in complete_job_queue_error and locations in
executors/src/eoa/store/atomic.rs) is unsafe with ClusterConnection; replace the
optimistic-transaction flow that uses lease_key_name, hook_pipeline, and the
subsequent post_*_completion calls with a single EVAL/EVALSHA Lua script that
(1) atomically checks the lease key and lease token, (2) applies the same Redis
mutations encoded by hook_pipeline, and (3) returns a status so the Rust side
can call post_success_completion/post_nack_completion/post_fail_completion only
after the script confirms success; implement script loading (EVALSHA fallback to
EVAL) and reuse the same script for all these places to ensure atomicity across
the cluster.
In `@twmq/tests/basic_hook.rs`:
- Line 68: The test fails because the hardcoded REDIS_URL in tests/basic_hook.rs
points at a standalone Redis while ClusterClient requires cluster mode; update
the test to either (A) point REDIS_URL to a cluster-enabled instance (e.g.,
single-node cluster endpoint your CI provides) so ClusterClient can connect, or
(B) add a test branch/feature gate that uses the non-cluster client path instead
of ClusterClient when cluster support is unavailable. Locate and modify the
constant REDIS_URL and the code that constructs/uses ClusterClient in the test
so it either reads a CI-provided cluster URL or conditionally falls back to the
non-cluster client.
In `@twmq/tests/basic.rs`:
- Around line 14-70: Queue::new currently always uses
redis::cluster::ClusterClient which fails on non-cluster Redis in CI; change
Queue::new to try creating a cluster connection first and on failure fall back
to a standard redis::Client connection (or detect the URL and choose
accordingly), and update the Queue struct's redis field to hold a type that can
represent either connection (e.g. an enum wrapper or a boxed trait like
redis::aio::ConnectionManager) so cleanup_redis_keys and other call sites
(referencing Queue::new, redis::cluster::ClusterClient, ClusterConnection, and
any Queue::redis usages) can operate against both cluster and non-cluster
connections without panicking. Ensure the fallback preserves async query
compatibility for KEYS/DEL and that error handling logs the chosen backend.
---
Duplicate comments:
In `@twmq/src/lib.rs`:
- Around line 963-1010: The Lua trim_script (and the similar scripts used by
post_success_completion and post_fail_completion) currently hardcode
'twmq:{engine}:' into job_meta_hash and errors_list_name; update these scripts
to accept the engine prefix as an input (e.g., add an ARGV or KEYS parameter
like engine_prefix or engine_name) and concatenate that parameter instead of the
literal '{engine}' when building job_meta_hash and errors_list_name (locate
occurrences in trim_script, and in the
post_success_completion/post_fail_completion scripts, and replace the hardcoded
string with the passed-in parameter variable).
In `@twmq/tests/idempotency_modes.rs`:
- Around line 7-72: This comment duplicates the cluster-enabled Redis
requirement already documented in twmq/tests/basic.rs; remove the redundant note
from twmq/tests/idempotency_modes.rs (or replace it with a single-line
reference) so there's a single source of truth; look for references in this file
around the cleanup_redis_keys helper and the test setup in idempotency_modes.rs
and either delete the duplicate comment or add a concise pointer to the existing
documentation in basic.rs.
In `@twmq/tests/nack.rs`:
- Around line 13-31: The comment about requiring a cluster-capable Redis
endpoint is duplicated here; remove the redundant note in this test and either
reference the existing guidance in the central/basic test or move the single
authoritative note into the shared test fixtures (e.g., where REDIS_URL or
cleanup_redis_keys is defined) so tests like cleanup_redis_keys and constants
such as REDIS_URL rely on one documented cluster requirement location.
In `@twmq/tests/prune_race_random_ids.rs`:
- Around line 7-127: The comment about requiring a cluster-enabled Redis
endpoint is duplicated (already in twmq/tests/basic.rs); remove the redundant
note from this test file and either reference the existing comment or
consolidate the requirement into a single shared place; update or remove the
block containing REDIS_URL/ClusterConnection setup or the top-of-file comment so
only the authoritative cluster-enabled Redis requirement remains (look for
REDIS_URL, ClusterConnection, and cleanup_redis_keys in this file to locate the
duplicated guidance).
---
Nitpick comments:
In `@executors/src/eoa/store/mod.rs`:
- Around line 123-336: Many key-builder methods (eoa_lock_key_name,
transaction_data_key_name, transaction_attempts_list_name,
pending_transactions_zset_name, submitted_transactions_zset_name,
transaction_hash_to_id_key_name, borrowed_transactions_hashmap_name,
recycled_nonces_zset_name, optimistic_transaction_count_key_name,
last_transaction_count_key_name, eoa_health_key_name, manual_reset_key_name)
repeat the same namespace match/format logic; add a private helper fn
prefix(&self) -> String that returns either "{ns}:{ENGINE_HASH_TAG}" or
"{ENGINE_HASH_TAG}" and refactor each method to call this prefix and then append
the remaining suffix (e.g., ":eoa_executor:lock:{}:{}" or formatted
transaction-specific suffix), removing the duplicated match blocks.
In `@executors/src/external_bundler/deployment.rs`:
- Around line 85-90: The duplicate cache_key implementation in
RedisDeploymentLock::cache_key and RedisDeploymentCache::cache_key should be
extracted to a shared helper: add a free function like fn
deployment_cache_key(chain_id: u64, account_address: &Address) -> String that
returns the formatted key using twmq::ENGINE_HASH_TAG and the same
"{CACHE_PREFIX}" token, then have both RedisDeploymentLock::cache_key and
RedisDeploymentCache::cache_key call that helper (replace their bodies with a
call to deployment_cache_key) to avoid duplication and keep formatting
consistent.
In `@executors/src/transaction_registry.rs`:
- Around line 33-36: registry_key now embeds twmq::ENGINE_HASH_TAG (in function
registry_key using self.namespace), which will break reads of existing registry
entries; implement a migration strategy: update registry_key to perform a
dual-read during a transition window (try new key first then fallback to the old
key format without ENGINE_HASH_TAG) or add a one-time migration/backfill routine
that scans for old keys and re-writes them using the new key format (reference
registry_key, self.namespace, and twmq::ENGINE_HASH_TAG to locate where to
change read/write logic).
In `@server/src/main.rs`:
- Around line 78-85: Extract the repeated comma-split-trim-filter-collect logic
into a single helper (e.g. fn parse_redis_nodes(url: &str) -> Vec<String>) in a
shared crate/module (twmq or a config util), then replace the inline code in
main.rs (where initial_nodes is built) and the duplicates in
integration-tests/tests/setup.rs and twmq/src/multilane.rs to call
parse_redis_nodes(&config.redis.url). Make the helper return owned Strings
(Vec<String>) to avoid lifetime issues, and add simple validation inside it to
return an error or panic when the resulting node list is empty so callers like
twmq::redis::cluster::ClusterClient::new get a non-empty list.
In `@twmq/src/multilane.rs`:
- Around line 406-408: The Lua scripts (cancel_job, pop_batch_jobs,
post_success_completion, post_fail_completion) currently build keys like local
lane_pending_list / lane_delayed_zset / lane_active_hash via string
concatenation inside the script; change these scripts to accept all Redis keys
via KEYS[] and replace uses of the concatenated variables with KEYS[n] (and
update the caller that loads/calls these scripts to pass the correct key list in
order), ensuring each lane/queue key is provided in KEYS[] so no dynamic key
construction occurs in the script body and cluster proxies (e.g., ElastiCache)
accept the script.
In `@twmq/src/queue.rs`:
- Around line 191-203: The Redis URL handling now treats a URL string as a
comma-separated list of cluster node addresses (see RedisSource::Url parsing),
but the public builder method redis_url(S: Into<String>) lacks documentation
about this contract; update the doc comment on the redis_url method to state
that the parameter should be a comma-separated list of Redis node URLs (e.g.,
"redis://node1:6379,redis://node2:6380"), note that a single URL (no commas) is
supported, and mention any accepted URL formats and trimming behavior so callers
know how to supply cluster endpoints.
In `@twmq/tests/basic_hook.rs`:
- Around line 24-41: The test helper cleanup_redis_keys uses redis::cmd("KEYS")
which is not cluster-safe and is blocking; replace it with an async,
cluster-safe incremental scan using SCAN (or CLUSTER KEYSLOT+SCAN if needed)
over the connection so all nodes are covered and the command is non-blocking. In
practice in the cleanup_redis_keys function, iterate with redis SCAN on the
ClusterConnection using the keys_pattern (twmq::ENGINE_HASH_TAG + queue_name) to
collect matching keys in pages and then DEL them (or pipeline deletes) only
after gathering them; ensure the scan loop handles cursor=0 termination and uses
query_async on the same connection rather than KEYS so the helper works
correctly in multi-node setups and avoids blocking.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (27)
Cargo.tomlexecutors/src/eoa/store/atomic.rsexecutors/src/eoa/store/borrowed.rsexecutors/src/eoa/store/mod.rsexecutors/src/eoa/store/pending.rsexecutors/src/eoa/store/submitted.rsexecutors/src/eoa/worker/mod.rsexecutors/src/external_bundler/deployment.rsexecutors/src/solana_executor/storage.rsexecutors/src/transaction_registry.rsintegration-tests/tests/setup.rsserver/Cargo.tomlserver/src/execution_router/mod.rsserver/src/main.rsserver/src/queue/manager.rstwmq/src/lib.rstwmq/src/multilane.rstwmq/src/queue.rstwmq/tests/basic.rstwmq/tests/basic_hook.rstwmq/tests/delay.rstwmq/tests/idempotency_modes.rstwmq/tests/lease_expiry.rstwmq/tests/multilane_batch_pop.rstwmq/tests/nack.rstwmq/tests/prune_race_condition.rstwmq/tests/prune_race_random_ids.rs
| use twmq::redis::{AsyncCommands, Pipeline}; | ||
| use twmq::redis::cluster_async::ClusterConnection; |
There was a problem hiding this comment.
Same critical WATCH incompatibility as in multilane.rs — affects EOA lock safety.
The SafeRedisTransaction trait and its consumers (with_lock_check at Lines 220–261, execute_with_watch_and_retry at Lines 336–392) rely on WATCH/MULTI/EXEC for optimistic locking. As noted in the multilane.rs review, ClusterConnection does not support WATCH because it doesn't guarantee connection affinity.
This is especially critical here because it guards the EOA lock mechanism — concurrent workers could both believe they hold the lock, leading to double-spending or nonce collisions.
Also applies to: 45-51
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@executors/src/eoa/store/atomic.rs` around lines 7 - 8, The code uses
ClusterConnection with WATCH/MULTI/EXEC via the SafeRedisTransaction trait, but
ClusterConnection doesn't support WATCH (connection-affinity issue) which breaks
the EOA lock safety; change the implementation to replace WATCH-based optimistic
transactions with an atomic Lua script (EVAL) or a Redis server-side CAS routine
for the lock flow. Specifically, update the SafeRedisTransaction consumers —
with_lock_check and execute_with_watch_and_retry — to call a single EVAL that
checks the lock owner and nonce and then sets/updates or aborts atomically (so
both the check and the write occur server-side), remove reliance on
WATCH/MULTI/EXEC, and stop using ClusterConnection for WATCH paths (keep
ClusterConnection but invoke EVAL via its async commands). Ensure the Lua script
returns clear success/failure codes that with_lock_check and
execute_with_watch_and_retry interpret to retry/fail as before.
| pub fn eoa_lock_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa), | ||
| None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:lock:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:lock:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } |
There was a problem hiding this comment.
Namespace containing { or } would break the hash-tag routing.
Redis Cluster computes the hash slot from the substring between the first { and the next } in the key. If namespace ever contains { or }, the effective hash tag would no longer be engine, causing keys to scatter across slots and breaking atomicity of pipelines and Lua scripts.
Consider validating the namespace at construction time (in EoaExecutorStoreKeys::new) or placing the hash tag before the namespace in the key format (e.g., {engine}:{ns}:eoa_executor:...).
Suggested defensive validation in the constructor
impl EoaExecutorStoreKeys {
pub fn new(eoa: Address, chain_id: u64, namespace: Option<String>) -> Self {
+ if let Some(ref ns) = namespace {
+ assert!(
+ !ns.contains('{') && !ns.contains('}'),
+ "Namespace must not contain '{{' or '}}' as it would break Redis Cluster hash tag routing"
+ );
+ }
Self {
eoa,
chain_id,
namespace,
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@executors/src/eoa/store/mod.rs` around lines 123 - 134, The key construction
in eoa_lock_key_name() can have namespace containing '{' or '}', which would
change Redis Cluster hash-tag parsing and break slot affinity; update the logic
in EoaExecutorStoreKeys (e.g., in EoaExecutorStoreKeys::new) to either
validate/escape namespace (reject or strip '{' and '}') or move the
twmq::ENGINE_HASH_TAG hash tag to the front of the key format (e.g., wrap
ENGINE_HASH_TAG in braces before namespace) and then update eoa_lock_key_name()
and other key-formatting methods to use the safe/tag-first format; ensure any
validation error surfaces at construction time so no keys are created with
illegal characters.
| let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) | ||
| .context("Failed to connect to Valkey Cluster")?; |
There was a problem hiding this comment.
Misleading error context — this is a config parse error, not a connection failure.
ClusterClient::new() only validates and parses the node URLs; it doesn't open a socket. Actual cluster connection happens at get_async_connection() (line 263). If an invalid URL is provided, the current message "Failed to connect to Valkey Cluster" would misdiagnose the problem.
🔧 Suggested fix
- let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
- .context("Failed to connect to Valkey Cluster")?;
+ let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)
+ .context("Failed to build Valkey Cluster client (invalid node URL?)")?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) | |
| .context("Failed to connect to Valkey Cluster")?; | |
| let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) | |
| .context("Failed to build Valkey Cluster client (invalid node URL?)")?; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@integration-tests/tests/setup.rs` around lines 178 - 179, The error context
on ClusterClient::new is misleading — it parses/validates node URLs rather than
opening a connection; update the context string for
twmq::redis::cluster::ClusterClient::new(initial_nodes) to indicate a config/URL
parse/validation failure (e.g., "Failed to parse/validate Valkey cluster node
URLs") and leave connection errors to be reported when calling
get_async_connection(); ensure any related error messages reference
parsing/validation rather than connection attempts for clarity.
| let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); | ||
| let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); |
There was a problem hiding this comment.
Directly causes CI pipeline failures — ClusterClient fails against non-cluster Redis.
Both .unwrap() calls panic because get_async_connection() executes CLUSTER NODES handshake against the CI Redis instance, which has cluster-enabled no, yielding ResponseError: This instance has cluster support disabled. This is the exact error in the pipeline failures at lines 135 and 311.
The same root issue affects all other twmq tests (Queue::new, MultilaneQueue::new) that internally construct a ClusterClient — those surface as .expect("…") panics rather than .unwrap() panics, so they appear differently in CI output.
Two remediation paths:
-
Enable single-shard cluster mode in CI — add
cluster-enabled yes+cluster-node-timeout 5000+cluster-config-file nodes.confto the Redis config used in the test workflow. A single-node cluster is fully functional and sufficient for tests. -
Reuse the queue's existing
ClusterConnectionfor direct Redis access, eliminating the duplicateClusterClientconstruction entirely:
♻️ Option 2 — reuse queue's connection
- // Create Redis connection for the execution context
- let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap();
- let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap());
+ // Reuse the connection the queue already holds
+ let redis_conn = queue.redis.clone();Then update the later uses:
- cleanup_redis_keys(&redis_conn, &queue_name).await;
+ cleanup_redis_keys(&queue.redis, &queue_name).await;
- let mut redis_conn_direct = redis_conn.as_ref().clone();
+ let mut redis_conn_direct = queue.redis.clone();Additionally, the Arc<ClusterConnection> wrapper is unnecessary: ClusterConnection is already Clone via its internal shared pool, so Arc<> adds reference-counting overhead for no benefit.
Also applies to: 310-311
🧰 Tools
🪛 GitHub Actions: twmq Tests
[error] 135-135: Called Result::unwrap() on an Err value: An error was signalled by the server - ResponseError: This instance has cluster support disabled
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@twmq/tests/delay.rs` around lines 134 - 135, Tests currently construct a
redis::cluster::ClusterClient and call get_async_connection().unwrap(), which
panics against a non-cluster Redis; instead either enable single-node cluster
mode in CI (set cluster-enabled yes, cluster-node-timeout 5000,
cluster-config-file nodes.conf) or change the tests to reuse the queue's
existing ClusterConnection (avoid creating a new ClusterClient in tests and use
the Queue/MultilaneQueue provided ClusterConnection instance), remove
unnecessary Arc<ClusterConnection> wrappers since ClusterConnection is Clone,
and replace unwrap()/expect() calls around ClusterClient/get_async_connection
with proper error handling when acquiring the reused connection.
| async fn cleanup(&self) { | ||
| let mut conn = self.queue.redis.clone(); | ||
| let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); | ||
| let keys_pattern = format!("twmq_multilane:{}:{}:*", twmq::ENGINE_HASH_TAG, self.queue_id); |
There was a problem hiding this comment.
KEYS in cluster mode only scans a single node — cleanup may silently miss keys in a multi-node cluster.
The KEYS command in a cluster environment returns only the keys served by the node the request is routed to, not all keys across the cluster. Since ENGINE_HASH_TAG is used as a hash tag to co-locate all related keys on the same slot, cleanup will work if redis-rs routes the command to the correct node — but KEYS <pattern> has no key argument for deterministic slot routing, so it may land on the wrong node in a multi-node cluster.
The redis-rs docs explicitly note that for commands like SCAN that need per-node execution, you must route them explicitly to each node using RoutingInfo. The same applies here.
Consider replacing KEYS + DEL with a SCAN-based iteration using RoutingInfo::SingleNode targeting the hash tag's slot, or at minimum accept that cleanup is best-effort and add a comment.
Additionally, MultilaneQueue::new(REDIS_URL, …) internally creates a ClusterClient, so these tests require a cluster-mode Redis — the same root cause as the pipeline failures in delay.rs.
Also applies to: 168-169
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@twmq/tests/multilane_batch_pop.rs` at line 93, The test currently builds
keys_pattern using ENGINE_HASH_TAG and self.queue_id and then uses KEYS + DEL
which in cluster mode only hits a single node; update the cleanup to iterate
with SCAN (per-node) routed to the proper slot by using RoutingInfo::SingleNode
for the hash-tagged slot so all nodes are scanned and matching keys are deleted,
or if you prefer a minimal change add a clear comment in the test near the
keys_pattern/KEYS usage noting that cleanup is best-effort in cluster mode and
that MultilaneQueue::new(...) creates a ClusterClient so the test requires a
cluster Redis instance; reference keys_pattern, ENGINE_HASH_TAG, self.queue_id,
MultilaneQueue::new, RoutingInfo::SingleNode, SCAN, and avoid using KEYS + DEL
in cluster tests.
Summary by CodeRabbit
New Features
Refactoring