Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,17 @@ config = "0.15.11"
aws-arn = "0.3.1"

# Redis
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] }

# Dev dependencies
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

# Rustls
#
# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider
# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via
# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a
# runtime panic when building TLS client/server configs.
#
# We explicitly enable the `ring` provider here to make TLS work reliably.
rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] }
7 changes: 4 additions & 3 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use alloy::{
consensus::{Signed, TypedTransaction},
primitives::Address,
};
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;
Comment on lines +7 to +8
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same critical WATCH incompatibility as in multilane.rs — affects EOA lock safety.

The SafeRedisTransaction trait and its consumers (with_lock_check at Lines 220–261, execute_with_watch_and_retry at Lines 336–392) rely on WATCH/MULTI/EXEC for optimistic locking. As noted in the multilane.rs review, ClusterConnection does not support WATCH because it doesn't guarantee connection affinity.

This is especially critical here because it guards the EOA lock mechanism — concurrent workers could both believe they hold the lock, leading to double-spending or nonce collisions.

Also applies to: 45-51

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/atomic.rs` around lines 7 - 8, The code uses
ClusterConnection with WATCH/MULTI/EXEC via the SafeRedisTransaction trait, but
ClusterConnection doesn't support WATCH (connection-affinity issue) which breaks
the EOA lock safety; change the implementation to replace WATCH-based optimistic
transactions with an atomic Lua script (EVAL) or a Redis server-side CAS routine
for the lock flow. Specifically, update the SafeRedisTransaction consumers —
with_lock_check and execute_with_watch_and_retry — to call a single EVAL that
checks the lock owner and nonce and then sets/updates or aborts atomically (so
both the check and the write occur server-side), remove reliance on
WATCH/MULTI/EXEC, and stop using ClusterConnection for WATCH paths (keep
ClusterConnection but invoke EVAL via its async commands). Ensure the Lua script
returns clear success/failure codes that with_lock_check and
execute_with_watch_and_retry interpret to retry/fail as before.


use crate::{
eoa::{
Expand Down Expand Up @@ -43,7 +44,7 @@ pub trait SafeRedisTransaction: Send + Sync {
) -> Self::OperationResult;
fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
store: &EoaExecutorStore,
) -> impl Future<Output = Result<Self::ValidationData, TransactionStoreError>> + Send;
fn watch_keys(&self) -> Vec<String>;
Expand Down Expand Up @@ -815,7 +816,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> {

async fn validation(
&self,
_conn: &mut ConnectionManager,
_conn: &mut ClusterConnection,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
Expand Down
5 changes: 3 additions & 2 deletions executors/src/eoa/store/borrowed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use twmq::Queue;
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;

use crate::eoa::EoaExecutorStore;
use crate::eoa::{
Expand Down Expand Up @@ -71,7 +72,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {

async fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
_store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
// Get all borrowed transaction IDs
Expand Down
117 changes: 80 additions & 37 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use engine_core::transaction::TransactionTypeData;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ops::Deref;
use twmq::redis::{AsyncCommands, aio::ConnectionManager};
use twmq::redis::AsyncCommands;
use twmq::redis::cluster_async::ClusterConnection;

mod atomic;
mod borrowed;
Expand Down Expand Up @@ -98,7 +99,7 @@ pub struct TransactionData {

/// Transaction store focused on transaction_id operations and nonce indexing
pub struct EoaExecutorStore {
pub redis: ConnectionManager,
pub redis: ClusterConnection,
pub keys: EoaExecutorStoreKeys,
pub completed_transaction_ttl_seconds: u64,
}
Expand All @@ -121,8 +122,14 @@ impl EoaExecutorStoreKeys {
/// Lock key name for EOA processing
pub fn eoa_lock_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
Some(ns) => format!(
"{ns}:{}:eoa_executor:lock:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:lock:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}
Comment on lines 123 to 134
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Namespace containing { or } would break the hash-tag routing.

Redis Cluster computes the hash slot from the substring between the first { and the next } in the key. If namespace ever contains { or }, the effective hash tag would no longer be engine, causing keys to scatter across slots and breaking atomicity of pipelines and Lua scripts.

Consider validating the namespace at construction time (in EoaExecutorStoreKeys::new) or placing the hash tag before the namespace in the key format (e.g., {engine}:{ns}:eoa_executor:...).

Suggested defensive validation in the constructor
 impl EoaExecutorStoreKeys {
     pub fn new(eoa: Address, chain_id: u64, namespace: Option<String>) -> Self {
+        if let Some(ref ns) = namespace {
+            assert!(
+                !ns.contains('{') && !ns.contains('}'),
+                "Namespace must not contain '{{' or '}}' as it would break Redis Cluster hash tag routing"
+            );
+        }
         Self {
             eoa,
             chain_id,
             namespace,
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executors/src/eoa/store/mod.rs` around lines 123 - 134, The key construction
in eoa_lock_key_name() can have namespace containing '{' or '}', which would
change Redis Cluster hash-tag parsing and break slot affinity; update the logic
in EoaExecutorStoreKeys (e.g., in EoaExecutorStoreKeys::new) to either
validate/escape namespace (reject or strip '{' and '}') or move the
twmq::ENGINE_HASH_TAG hash tag to the front of the key format (e.g., wrap
ENGINE_HASH_TAG in braces before namespace) and then update eoa_lock_key_name()
and other key-formatting methods to use the safe/tag-first format; ensure any
validation error surfaces at construction time so no keys are created with
illegal characters.


Expand All @@ -137,8 +144,14 @@ impl EoaExecutorStoreKeys {
/// - "failure_reason": String failure reason (optional)
pub fn transaction_data_key_name(&self, transaction_id: &str) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"),
None => format!("eoa_executor:tx_data:{transaction_id}"),
Some(ns) => format!(
"{ns}:{}:eoa_executor:tx_data:{transaction_id}",
twmq::ENGINE_HASH_TAG
),
None => format!(
"{}:eoa_executor:tx_data:{transaction_id}",
twmq::ENGINE_HASH_TAG
),
}
}

Expand All @@ -148,8 +161,14 @@ impl EoaExecutorStoreKeys {
/// of a TransactionAttempt. This allows efficient append operations.
pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"),
None => format!("eoa_executor:tx_attempts:{transaction_id}"),
Some(ns) => format!(
"{ns}:{}:eoa_executor:tx_attempts:{transaction_id}",
twmq::ENGINE_HASH_TAG
),
None => format!(
"{}:eoa_executor:tx_attempts:{transaction_id}",
twmq::ENGINE_HASH_TAG
),
}
}

Expand All @@ -159,10 +178,13 @@ impl EoaExecutorStoreKeys {
pub fn pending_transactions_zset_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:pending_txs:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:pending_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:pending_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa),
}
}

Expand All @@ -172,18 +194,27 @@ impl EoaExecutorStoreKeys {
pub fn submitted_transactions_zset_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:submitted_txs:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:submitted_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:submitted_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa),
}
}

/// Name of the key that maps transaction hash to transaction id
pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"),
None => format!("eoa_executor:tx_hash_to_id:{hash}"),
Some(ns) => format!(
"{ns}:{}:eoa_executor:tx_hash_to_id:{hash}",
twmq::ENGINE_HASH_TAG
),
None => format!(
"{}:eoa_executor:tx_hash_to_id:{hash}",
twmq::ENGINE_HASH_TAG
),
}
}

Expand All @@ -197,10 +228,13 @@ impl EoaExecutorStoreKeys {
pub fn borrowed_transactions_hashmap_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:borrowed_txs:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:borrowed_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:borrowed_txs:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa),
}
}

Expand All @@ -214,12 +248,12 @@ impl EoaExecutorStoreKeys {
pub fn recycled_nonces_zset_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:recycled_nonces:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:recycled_nonces:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"eoa_executor:recycled_nonces:{}:{}",
self.chain_id, self.eoa
"{}:eoa_executor:recycled_nonces:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}
Expand All @@ -236,12 +270,12 @@ impl EoaExecutorStoreKeys {
pub fn optimistic_transaction_count_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:optimistic_nonce:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:optimistic_nonce:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"eoa_executor:optimistic_nonce:{}:{}",
self.chain_id, self.eoa
"{}:eoa_executor:optimistic_nonce:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}
Expand All @@ -256,10 +290,13 @@ impl EoaExecutorStoreKeys {
pub fn last_transaction_count_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:last_tx_nonce:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:last_tx_nonce:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:last_tx_nonce:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa),
}
}

Expand All @@ -271,8 +308,14 @@ impl EoaExecutorStoreKeys {
/// - timestamp of the last 5 nonce resets
pub fn eoa_health_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa),
None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa),
Some(ns) => format!(
"{ns}:{}:eoa_executor:health:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"{}:eoa_executor:health:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}

Expand All @@ -282,20 +325,20 @@ impl EoaExecutorStoreKeys {
pub fn manual_reset_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
"{ns}:{}:eoa_executor:pending_manual_reset:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
None => format!(
"eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
"{}:eoa_executor:pending_manual_reset:{}:{}",
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
),
}
}
}

impl EoaExecutorStore {
pub fn new(
redis: ConnectionManager,
redis: ClusterConnection,
namespace: Option<String>,
eoa: Address,
chain_id: u64,
Expand Down
7 changes: 4 additions & 3 deletions executors/src/eoa/store/pending.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashSet;

use alloy::{consensus::Transaction, primitives::Address};
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;

use crate::eoa::{
EoaExecutorStore,
Expand Down Expand Up @@ -46,7 +47,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {

async fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
_store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
if self.transactions.is_empty() {
Expand Down Expand Up @@ -181,7 +182,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> {

async fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
_store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
if self.transactions.is_empty() {
Expand Down
7 changes: 4 additions & 3 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::{
};

use serde::{Deserialize, Serialize};
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;

use crate::{
TransactionCounts,
Expand Down Expand Up @@ -279,7 +280,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {

async fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
// Fetch transactions up to the latest confirmed nonce for replacements
Expand Down Expand Up @@ -592,7 +593,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {

async fn validation(
&self,
conn: &mut ConnectionManager,
conn: &mut ClusterConnection,
_store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
// get the highest submitted nonce
Expand Down
4 changes: 2 additions & 2 deletions executors/src/eoa/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use engine_eip7702_core::delegated_account::DelegatedAccount;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use twmq::Queue;
use twmq::redis::aio::ConnectionManager;
use twmq::redis::cluster_async::ClusterConnection;
use twmq::{
DurableExecution, FailHookData, NackHookData, SuccessHookData,
hooks::TransactionContext,
Expand Down Expand Up @@ -114,7 +114,7 @@ where
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
pub authorization_cache: EoaAuthorizationCache,

pub redis: ConnectionManager,
pub redis: ClusterConnection,
pub namespace: Option<String>,

pub eoa_signer: Arc<EoaSigner>,
Expand Down
Loading
Loading