-
Notifications
You must be signed in to change notification settings - Fork 11
valkey integration #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,8 @@ use engine_core::transaction::TransactionTypeData; | |
| use serde::{Deserialize, Serialize}; | ||
| use std::collections::HashMap; | ||
| use std::ops::Deref; | ||
| use twmq::redis::{AsyncCommands, aio::ConnectionManager}; | ||
| use twmq::redis::AsyncCommands; | ||
| use twmq::redis::cluster_async::ClusterConnection; | ||
|
|
||
| mod atomic; | ||
| mod borrowed; | ||
|
|
@@ -98,7 +99,7 @@ pub struct TransactionData { | |
|
|
||
| /// Transaction store focused on transaction_id operations and nonce indexing | ||
| pub struct EoaExecutorStore { | ||
| pub redis: ConnectionManager, | ||
| pub redis: ClusterConnection, | ||
| pub keys: EoaExecutorStoreKeys, | ||
| pub completed_transaction_ttl_seconds: u64, | ||
| } | ||
|
|
@@ -121,8 +122,14 @@ impl EoaExecutorStoreKeys { | |
| /// Lock key name for EOA processing | ||
| pub fn eoa_lock_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa), | ||
| None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:lock:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:lock:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } | ||
|
Comment on lines
123
to
134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Namespace containing Redis Cluster computes the hash slot from the substring between the first Consider validating the namespace at construction time (in Suggested defensive validation in the constructor impl EoaExecutorStoreKeys {
pub fn new(eoa: Address, chain_id: u64, namespace: Option<String>) -> Self {
+ if let Some(ref ns) = namespace {
+ assert!(
+ !ns.contains('{') && !ns.contains('}'),
+ "Namespace must not contain '{{' or '}}' as it would break Redis Cluster hash tag routing"
+ );
+ }
Self {
eoa,
chain_id,
namespace,
}
}🤖 Prompt for AI Agents |
||
|
|
||
|
|
@@ -137,8 +144,14 @@ impl EoaExecutorStoreKeys { | |
| /// - "failure_reason": String failure reason (optional) | ||
| pub fn transaction_data_key_name(&self, transaction_id: &str) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"), | ||
| None => format!("eoa_executor:tx_data:{transaction_id}"), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:tx_data:{transaction_id}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:tx_data:{transaction_id}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -148,8 +161,14 @@ impl EoaExecutorStoreKeys { | |
| /// of a TransactionAttempt. This allows efficient append operations. | ||
| pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"), | ||
| None => format!("eoa_executor:tx_attempts:{transaction_id}"), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:tx_attempts:{transaction_id}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:tx_attempts:{transaction_id}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -159,10 +178,13 @@ impl EoaExecutorStoreKeys { | |
| pub fn pending_transactions_zset_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:pending_txs:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:pending_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:pending_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -172,18 +194,27 @@ impl EoaExecutorStoreKeys { | |
| pub fn submitted_transactions_zset_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:submitted_txs:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:submitted_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:submitted_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa), | ||
| } | ||
| } | ||
|
|
||
| /// Name of the key that maps transaction hash to transaction id | ||
| pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"), | ||
| None => format!("eoa_executor:tx_hash_to_id:{hash}"), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:tx_hash_to_id:{hash}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:tx_hash_to_id:{hash}", | ||
| twmq::ENGINE_HASH_TAG | ||
| ), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -197,10 +228,13 @@ impl EoaExecutorStoreKeys { | |
| pub fn borrowed_transactions_hashmap_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:borrowed_txs:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:borrowed_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:borrowed_txs:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -214,12 +248,12 @@ impl EoaExecutorStoreKeys { | |
| pub fn recycled_nonces_zset_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:recycled_nonces:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:recycled_nonces:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "eoa_executor:recycled_nonces:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{}:eoa_executor:recycled_nonces:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } | ||
|
|
@@ -236,12 +270,12 @@ impl EoaExecutorStoreKeys { | |
| pub fn optimistic_transaction_count_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:optimistic_nonce:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:optimistic_nonce:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "eoa_executor:optimistic_nonce:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{}:eoa_executor:optimistic_nonce:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } | ||
|
|
@@ -256,10 +290,13 @@ impl EoaExecutorStoreKeys { | |
| pub fn last_transaction_count_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:last_tx_nonce:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:last_tx_nonce:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:last_tx_nonce:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -271,8 +308,14 @@ impl EoaExecutorStoreKeys { | |
| /// - timestamp of the last 5 nonce resets | ||
| pub fn eoa_health_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa), | ||
| None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa), | ||
| Some(ns) => format!( | ||
| "{ns}:{}:eoa_executor:health:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "{}:eoa_executor:health:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -282,20 +325,20 @@ impl EoaExecutorStoreKeys { | |
| pub fn manual_reset_key_name(&self) -> String { | ||
| match &self.namespace { | ||
| Some(ns) => format!( | ||
| "{ns}:eoa_executor:pending_manual_reset:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{ns}:{}:eoa_executor:pending_manual_reset:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| None => format!( | ||
| "eoa_executor:pending_manual_reset:{}:{}", | ||
| self.chain_id, self.eoa | ||
| "{}:eoa_executor:pending_manual_reset:{}:{}", | ||
| twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa | ||
| ), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl EoaExecutorStore { | ||
| pub fn new( | ||
| redis: ConnectionManager, | ||
| redis: ClusterConnection, | ||
| namespace: Option<String>, | ||
| eoa: Address, | ||
| chain_id: u64, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same critical WATCH incompatibility as in multilane.rs — affects EOA lock safety.
The
SafeRedisTransactiontrait and its consumers (with_lock_checkat Lines 220–261,execute_with_watch_and_retryat Lines 336–392) rely onWATCH/MULTI/EXECfor optimistic locking. As noted in the multilane.rs review,ClusterConnectiondoes not supportWATCHbecause it doesn't guarantee connection affinity.This is especially critical here because it guards the EOA lock mechanism — concurrent workers could both believe they hold the lock, leading to double-spending or nonce collisions.
Also applies to: 45-51
🤖 Prompt for AI Agents