Skip to content

Commit 5da583b

Browse files
committed
valkey integration
1 parent e570bc8 commit 5da583b

File tree

26 files changed

+390
-258
lines changed

26 files changed

+390
-258
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ config = "0.15.11"
9898
aws-arn = "0.3.1"
9999

100100
# Redis
101-
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }
101+
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] }
102102

103103
# Dev dependencies
104104
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

executors/src/eoa/store/atomic.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use alloy::{
44
consensus::{Signed, TypedTransaction},
55
primitives::Address,
66
};
7-
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
7+
use twmq::redis::{AsyncCommands, Pipeline};
8+
use twmq::redis::cluster_async::ClusterConnection;
89

910
use crate::{
1011
eoa::{
@@ -43,7 +44,7 @@ pub trait SafeRedisTransaction: Send + Sync {
4344
) -> Self::OperationResult;
4445
fn validation(
4546
&self,
46-
conn: &mut ConnectionManager,
47+
conn: &mut ClusterConnection,
4748
store: &EoaExecutorStore,
4849
) -> impl Future<Output = Result<Self::ValidationData, TransactionStoreError>> + Send;
4950
fn watch_keys(&self) -> Vec<String>;
@@ -815,7 +816,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> {
815816

816817
async fn validation(
817818
&self,
818-
_conn: &mut ConnectionManager,
819+
_conn: &mut ClusterConnection,
819820
store: &EoaExecutorStore,
820821
) -> Result<Self::ValidationData, TransactionStoreError> {
821822
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

executors/src/eoa/store/borrowed.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::sync::Arc;
22

33
use twmq::Queue;
4-
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
4+
use twmq::redis::{AsyncCommands, Pipeline};
5+
use twmq::redis::cluster_async::ClusterConnection;
56

67
use crate::eoa::EoaExecutorStore;
78
use crate::eoa::{
@@ -71,7 +72,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
7172

7273
async fn validation(
7374
&self,
74-
conn: &mut ConnectionManager,
75+
conn: &mut ClusterConnection,
7576
_store: &EoaExecutorStore,
7677
) -> Result<Self::ValidationData, TransactionStoreError> {
7778
// Get all borrowed transaction IDs

executors/src/eoa/store/mod.rs

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use engine_core::transaction::TransactionTypeData;
99
use serde::{Deserialize, Serialize};
1010
use std::collections::HashMap;
1111
use std::ops::Deref;
12-
use twmq::redis::{AsyncCommands, aio::ConnectionManager};
12+
use twmq::redis::AsyncCommands;
13+
use twmq::redis::cluster_async::ClusterConnection;
1314

1415
mod atomic;
1516
mod borrowed;
@@ -98,7 +99,7 @@ pub struct TransactionData {
9899

99100
/// Transaction store focused on transaction_id operations and nonce indexing
100101
pub struct EoaExecutorStore {
101-
pub redis: ConnectionManager,
102+
pub redis: ClusterConnection,
102103
pub keys: EoaExecutorStoreKeys,
103104
pub completed_transaction_ttl_seconds: u64,
104105
}
@@ -121,8 +122,14 @@ impl EoaExecutorStoreKeys {
121122
/// Lock key name for EOA processing
122123
pub fn eoa_lock_key_name(&self) -> String {
123124
match &self.namespace {
124-
Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
125-
None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa),
125+
Some(ns) => format!(
126+
"{ns}:{}:eoa_executor:lock:{}:{}",
127+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
128+
),
129+
None => format!(
130+
"{}:eoa_executor:lock:{}:{}",
131+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
132+
),
126133
}
127134
}
128135

@@ -137,8 +144,14 @@ impl EoaExecutorStoreKeys {
137144
/// - "failure_reason": String failure reason (optional)
138145
pub fn transaction_data_key_name(&self, transaction_id: &str) -> String {
139146
match &self.namespace {
140-
Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"),
141-
None => format!("eoa_executor:tx_data:{transaction_id}"),
147+
Some(ns) => format!(
148+
"{ns}:{}:eoa_executor:tx_data:{transaction_id}",
149+
twmq::ENGINE_HASH_TAG
150+
),
151+
None => format!(
152+
"{}:eoa_executor:tx_data:{transaction_id}",
153+
twmq::ENGINE_HASH_TAG
154+
),
142155
}
143156
}
144157

@@ -148,8 +161,14 @@ impl EoaExecutorStoreKeys {
148161
/// of a TransactionAttempt. This allows efficient append operations.
149162
pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String {
150163
match &self.namespace {
151-
Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"),
152-
None => format!("eoa_executor:tx_attempts:{transaction_id}"),
164+
Some(ns) => format!(
165+
"{ns}:{}:eoa_executor:tx_attempts:{transaction_id}",
166+
twmq::ENGINE_HASH_TAG
167+
),
168+
None => format!(
169+
"{}:eoa_executor:tx_attempts:{transaction_id}",
170+
twmq::ENGINE_HASH_TAG
171+
),
153172
}
154173
}
155174

@@ -159,10 +178,13 @@ impl EoaExecutorStoreKeys {
159178
pub fn pending_transactions_zset_name(&self) -> String {
160179
match &self.namespace {
161180
Some(ns) => format!(
162-
"{ns}:eoa_executor:pending_txs:{}:{}",
163-
self.chain_id, self.eoa
181+
"{ns}:{}:eoa_executor:pending_txs:{}:{}",
182+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
183+
),
184+
None => format!(
185+
"{}:eoa_executor:pending_txs:{}:{}",
186+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
164187
),
165-
None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa),
166188
}
167189
}
168190

@@ -172,18 +194,27 @@ impl EoaExecutorStoreKeys {
172194
pub fn submitted_transactions_zset_name(&self) -> String {
173195
match &self.namespace {
174196
Some(ns) => format!(
175-
"{ns}:eoa_executor:submitted_txs:{}:{}",
176-
self.chain_id, self.eoa
197+
"{ns}:{}:eoa_executor:submitted_txs:{}:{}",
198+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
199+
),
200+
None => format!(
201+
"{}:eoa_executor:submitted_txs:{}:{}",
202+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
177203
),
178-
None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa),
179204
}
180205
}
181206

182207
/// Name of the key that maps transaction hash to transaction id
183208
pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String {
184209
match &self.namespace {
185-
Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"),
186-
None => format!("eoa_executor:tx_hash_to_id:{hash}"),
210+
Some(ns) => format!(
211+
"{ns}:{}:eoa_executor:tx_hash_to_id:{hash}",
212+
twmq::ENGINE_HASH_TAG
213+
),
214+
None => format!(
215+
"{}:eoa_executor:tx_hash_to_id:{hash}",
216+
twmq::ENGINE_HASH_TAG
217+
),
187218
}
188219
}
189220

@@ -197,10 +228,13 @@ impl EoaExecutorStoreKeys {
197228
pub fn borrowed_transactions_hashmap_name(&self) -> String {
198229
match &self.namespace {
199230
Some(ns) => format!(
200-
"{ns}:eoa_executor:borrowed_txs:{}:{}",
201-
self.chain_id, self.eoa
231+
"{ns}:{}:eoa_executor:borrowed_txs:{}:{}",
232+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
233+
),
234+
None => format!(
235+
"{}:eoa_executor:borrowed_txs:{}:{}",
236+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
202237
),
203-
None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa),
204238
}
205239
}
206240

@@ -214,12 +248,12 @@ impl EoaExecutorStoreKeys {
214248
pub fn recycled_nonces_zset_name(&self) -> String {
215249
match &self.namespace {
216250
Some(ns) => format!(
217-
"{ns}:eoa_executor:recycled_nonces:{}:{}",
218-
self.chain_id, self.eoa
251+
"{ns}:{}:eoa_executor:recycled_nonces:{}:{}",
252+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
219253
),
220254
None => format!(
221-
"eoa_executor:recycled_nonces:{}:{}",
222-
self.chain_id, self.eoa
255+
"{}:eoa_executor:recycled_nonces:{}:{}",
256+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
223257
),
224258
}
225259
}
@@ -236,12 +270,12 @@ impl EoaExecutorStoreKeys {
236270
pub fn optimistic_transaction_count_key_name(&self) -> String {
237271
match &self.namespace {
238272
Some(ns) => format!(
239-
"{ns}:eoa_executor:optimistic_nonce:{}:{}",
240-
self.chain_id, self.eoa
273+
"{ns}:{}:eoa_executor:optimistic_nonce:{}:{}",
274+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
241275
),
242276
None => format!(
243-
"eoa_executor:optimistic_nonce:{}:{}",
244-
self.chain_id, self.eoa
277+
"{}:eoa_executor:optimistic_nonce:{}:{}",
278+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
245279
),
246280
}
247281
}
@@ -256,10 +290,13 @@ impl EoaExecutorStoreKeys {
256290
pub fn last_transaction_count_key_name(&self) -> String {
257291
match &self.namespace {
258292
Some(ns) => format!(
259-
"{ns}:eoa_executor:last_tx_nonce:{}:{}",
260-
self.chain_id, self.eoa
293+
"{ns}:{}:eoa_executor:last_tx_nonce:{}:{}",
294+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
295+
),
296+
None => format!(
297+
"{}:eoa_executor:last_tx_nonce:{}:{}",
298+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
261299
),
262-
None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa),
263300
}
264301
}
265302

@@ -271,8 +308,14 @@ impl EoaExecutorStoreKeys {
271308
/// - timestamp of the last 5 nonce resets
272309
pub fn eoa_health_key_name(&self) -> String {
273310
match &self.namespace {
274-
Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa),
275-
None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa),
311+
Some(ns) => format!(
312+
"{ns}:{}:eoa_executor:health:{}:{}",
313+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
314+
),
315+
None => format!(
316+
"{}:eoa_executor:health:{}:{}",
317+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
318+
),
276319
}
277320
}
278321

@@ -282,20 +325,20 @@ impl EoaExecutorStoreKeys {
282325
pub fn manual_reset_key_name(&self) -> String {
283326
match &self.namespace {
284327
Some(ns) => format!(
285-
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
286-
self.chain_id, self.eoa
328+
"{ns}:{}:eoa_executor:pending_manual_reset:{}:{}",
329+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
287330
),
288331
None => format!(
289-
"eoa_executor:pending_manual_reset:{}:{}",
290-
self.chain_id, self.eoa
332+
"{}:eoa_executor:pending_manual_reset:{}:{}",
333+
twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa
291334
),
292335
}
293336
}
294337
}
295338

296339
impl EoaExecutorStore {
297340
pub fn new(
298-
redis: ConnectionManager,
341+
redis: ClusterConnection,
299342
namespace: Option<String>,
300343
eoa: Address,
301344
chain_id: u64,

executors/src/eoa/store/pending.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::collections::HashSet;
22

33
use alloy::{consensus::Transaction, primitives::Address};
4-
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
4+
use twmq::redis::{AsyncCommands, Pipeline};
5+
use twmq::redis::cluster_async::ClusterConnection;
56

67
use crate::eoa::{
78
EoaExecutorStore,
@@ -46,7 +47,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
4647

4748
async fn validation(
4849
&self,
49-
conn: &mut ConnectionManager,
50+
conn: &mut ClusterConnection,
5051
_store: &EoaExecutorStore,
5152
) -> Result<Self::ValidationData, TransactionStoreError> {
5253
if self.transactions.is_empty() {
@@ -181,7 +182,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> {
181182

182183
async fn validation(
183184
&self,
184-
conn: &mut ConnectionManager,
185+
conn: &mut ClusterConnection,
185186
_store: &EoaExecutorStore,
186187
) -> Result<Self::ValidationData, TransactionStoreError> {
187188
if self.transactions.is_empty() {

executors/src/eoa/store/submitted.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::{
55
};
66

77
use serde::{Deserialize, Serialize};
8-
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
8+
use twmq::redis::{AsyncCommands, Pipeline};
9+
use twmq::redis::cluster_async::ClusterConnection;
910

1011
use crate::{
1112
TransactionCounts,
@@ -279,7 +280,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
279280

280281
async fn validation(
281282
&self,
282-
conn: &mut ConnectionManager,
283+
conn: &mut ClusterConnection,
283284
store: &EoaExecutorStore,
284285
) -> Result<Self::ValidationData, TransactionStoreError> {
285286
// Fetch transactions up to the latest confirmed nonce for replacements
@@ -592,7 +593,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
592593

593594
async fn validation(
594595
&self,
595-
conn: &mut ConnectionManager,
596+
conn: &mut ClusterConnection,
596597
_store: &EoaExecutorStore,
597598
) -> Result<Self::ValidationData, TransactionStoreError> {
598599
// get the highest submitted nonce

executors/src/eoa/worker/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use engine_eip7702_core::delegated_account::DelegatedAccount;
1111
use serde::{Deserialize, Serialize};
1212
use std::{sync::Arc, time::Duration};
1313
use twmq::Queue;
14-
use twmq::redis::aio::ConnectionManager;
14+
use twmq::redis::cluster_async::ClusterConnection;
1515
use twmq::{
1616
DurableExecution, FailHookData, NackHookData, SuccessHookData,
1717
hooks::TransactionContext,
@@ -114,7 +114,7 @@ where
114114
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
115115
pub authorization_cache: EoaAuthorizationCache,
116116

117-
pub redis: ConnectionManager,
117+
pub redis: ClusterConnection,
118118
pub namespace: Option<String>,
119119

120120
pub eoa_signer: Arc<EoaSigner>,

0 commit comments

Comments
 (0)