Skip to content

Commit d729f5f

Browse files
authored
redis tls support (#95)
* redis tls support * minor change * tls update * valkey hashtag
1 parent 196bf34 commit d729f5f

17 files changed

+126
-45
lines changed

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ config = "0.15.11"
9898
aws-arn = "0.3.1"
9999

100100
# Redis
101-
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }
101+
redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] }
102+
103+
# Rustls (required for TLS crypto provider selection).
104+
# Rustls 0.23 requires exactly one crypto provider feature (ring or aws-lc-rs).
105+
rustls = { version = "0.23.32", default-features = false, features = ["ring"] }
102106

103107
# Dev dependencies
104108
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ thirdweb:
147147

148148
redis:
149149
url: "redis://localhost:6379"
150+
# For Redis over TLS, use the `rediss://` scheme:
151+
# url: "rediss://localhost:6379"
150152

151153
queue:
152154
webhook_workers: 50
@@ -166,6 +168,8 @@ export APP__QUEUE__LOCAL_CONCURRENCY=500
166168

167169
# Custom Redis configuration
168170
export APP__REDIS__URL="redis://redis-cluster:6379"
171+
# For Redis over TLS, use the `rediss://` scheme:
172+
# export APP__REDIS__URL="rediss://redis-cluster:6379"
169173

170174
# Debug logging for development
171175
export RUST_LOG="thirdweb_engine=debug,twmq=debug"

server/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ aws-arn = { workspace = true }
5050
moka = { workspace = true }
5151
engine-eip7702-core = { path = "../eip7702-core" }
5252
prometheus = { workspace = true }
53-
thiserror = { workspace = true }
53+
thiserror = { workspace = true }
54+
rustls = { workspace = true }

server/DOCKER.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ The following environment variables must be set when running the container:
2525
```bash
2626
# Redis Configuration
2727
APP__REDIS__URL=redis://localhost:6379
28+
# For Redis over TLS, use the `rediss://` scheme:
29+
# APP__REDIS__URL=rediss://localhost:6379
2830

2931
# Thirdweb Configuration
3032
APP__THIRDWEB__SECRET=your_secret_key_here
@@ -68,6 +70,8 @@ Create a `.env` file with your configuration:
6870
```bash
6971
# .env file
7072
APP__REDIS__URL=redis://localhost:6379
73+
# For Redis over TLS, use the `rediss://` scheme:
74+
# APP__REDIS__URL=rediss://localhost:6379
7175
APP__THIRDWEB__SECRET=your_secret_key_here
7276
APP__THIRDWEB__CLIENT_ID=your_client_id_here
7377
APP__THIRDWEB__URLS__RPC=https://your-rpc-url.com
@@ -128,6 +132,8 @@ services:
128132
- "8080:8080"
129133
environment:
130134
- APP__REDIS__URL=redis://redis:6379
135+
# For Redis over TLS, use the `rediss://` scheme:
136+
# - APP__REDIS__URL=rediss://redis:6379
131137
- APP__THIRDWEB__SECRET=${APP__THIRDWEB__SECRET}
132138
- APP__THIRDWEB__CLIENT_ID=${APP__THIRDWEB__CLIENT_ID}
133139
- APP__THIRDWEB__URLS__RPC=${APP__THIRDWEB__URLS__RPC}

server/src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> {
7070
});
7171
let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone()));
7272
let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client));
73+
74+
// Rustls 0.23 requires selecting a process-level CryptoProvider (ring or aws-lc-rs)
75+
// before any TLS client configuration is created (e.g. when using `rediss://`).
76+
// If another crate already installed a provider, this will be a no-op error.
77+
if let Err(e) = rustls::crypto::ring::default_provider().install_default() {
78+
tracing::debug!(error = ?e, "Rustls CryptoProvider already installed");
79+
}
80+
7381
let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?;
7482

7583
let authorization_cache = EoaAuthorizationCache::new(

twmq/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ thiserror = { workspace = true }
1414
tracing = { workspace = true }
1515
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
1616
futures = { workspace = true }
17+
rustls = { workspace = true }
1718

1819
[dev-dependencies]
1920
tokio = { workspace = true, features = ["full"] }

twmq/src/lib.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -175,52 +175,63 @@ impl<H: DurableExecution> Queue<H> {
175175
&self.name
176176
}
177177

178+
/// Redis Cluster hash tag used to keep all queue keys in the same slot.
179+
/// See: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags
180+
fn redis_hash_tag(&self) -> String {
181+
format!("{{{}}}", self.name())
182+
}
183+
178184
pub fn pending_list_name(&self) -> String {
179-
format!("twmq:{}:pending", self.name())
185+
format!("twmq:{}:pending", self.redis_hash_tag())
180186
}
181187

182188
pub fn active_hash_name(&self) -> String {
183-
format!("twmq:{}:active", self.name)
189+
format!("twmq:{}:active", self.redis_hash_tag())
184190
}
185191

186192
pub fn delayed_zset_name(&self) -> String {
187-
format!("twmq:{}:delayed", self.name)
193+
format!("twmq:{}:delayed", self.redis_hash_tag())
188194
}
189195

190196
pub fn success_list_name(&self) -> String {
191-
format!("twmq:{}:success", self.name)
197+
format!("twmq:{}:success", self.redis_hash_tag())
192198
}
193199

194200
pub fn failed_list_name(&self) -> String {
195-
format!("twmq:{}:failed", self.name)
201+
format!("twmq:{}:failed", self.redis_hash_tag())
196202
}
197203

198204
pub fn job_data_hash_name(&self) -> String {
199-
format!("twmq:{}:jobs:data", self.name)
205+
format!("twmq:{}:jobs:data", self.redis_hash_tag())
200206
}
201207

202208
pub fn job_meta_hash_name(&self, job_id: &str) -> String {
203-
format!("twmq:{}:job:{}:meta", self.name, job_id)
209+
format!("twmq:{}:job:{}:meta", self.redis_hash_tag(), job_id)
204210
}
205211

206212
pub fn job_errors_list_name(&self, job_id: &str) -> String {
207-
format!("twmq:{}:job:{}:errors", self.name, job_id)
213+
format!("twmq:{}:job:{}:errors", self.redis_hash_tag(), job_id)
208214
}
209215

210216
pub fn job_result_hash_name(&self) -> String {
211-
format!("twmq:{}:jobs:result", self.name)
217+
format!("twmq:{}:jobs:result", self.redis_hash_tag())
212218
}
213219

214220
pub fn dedupe_set_name(&self) -> String {
215-
format!("twmq:{}:dedup", self.name)
221+
format!("twmq:{}:dedup", self.redis_hash_tag())
216222
}
217223

218224
pub fn pending_cancellation_set_name(&self) -> String {
219-
format!("twmq:{}:pending_cancellations", self.name)
225+
format!("twmq:{}:pending_cancellations", self.redis_hash_tag())
220226
}
221227

222228
pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String {
223-
format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token)
229+
format!(
230+
"twmq:{}:job:{}:lease:{}",
231+
self.redis_hash_tag(),
232+
job_id,
233+
lease_token
234+
)
224235
}
225236

226237
pub async fn push(
@@ -301,7 +312,8 @@ impl<H: DurableExecution> Queue<H> {
301312
let position_string = delay.position.to_string();
302313

303314
let _result: (i32, String) = script
304-
.key(&self.name)
315+
// Redis Cluster: all KEYS must be in the same slot
316+
.key(self.redis_hash_tag())
305317
.key(self.delayed_zset_name())
306318
.key(self.pending_list_name())
307319
.key(self.job_data_hash_name())
@@ -742,7 +754,8 @@ impl<H: DurableExecution> Queue<H> {
742754
Vec<String>,
743755
Vec<String>,
744756
) = script
745-
.key(self.name())
757+
// Redis Cluster: all KEYS must be in the same slot
758+
.key(self.redis_hash_tag())
746759
.key(self.delayed_zset_name())
747760
.key(self.pending_list_name())
748761
.key(self.active_hash_name())
@@ -990,7 +1003,8 @@ impl<H: DurableExecution> Queue<H> {
9901003
);
9911004

9921005
let trimmed_count: usize = trim_script
993-
.key(self.name())
1006+
// Redis Cluster: all KEYS must be in the same slot
1007+
.key(self.redis_hash_tag())
9941008
.key(self.success_list_name())
9951009
.key(self.job_data_hash_name())
9961010
.key(self.job_result_hash_name()) // results_hash
@@ -1168,7 +1182,8 @@ impl<H: DurableExecution> Queue<H> {
11681182
);
11691183

11701184
let trimmed_count: usize = trim_script
1171-
.key(self.name())
1185+
// Redis Cluster: all KEYS must be in the same slot
1186+
.key(self.redis_hash_tag())
11721187
.key(self.failed_list_name())
11731188
.key(self.job_data_hash_name())
11741189
.key(self.dedupe_set_name())

twmq/src/multilane.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,59 +84,89 @@ impl<H: DurableExecution> MultilaneQueue<H> {
8484
&self.queue_id
8585
}
8686

87+
/// Redis Cluster hash tag used to keep all multilane keys in the same slot.
88+
fn redis_hash_tag(&self) -> String {
89+
format!("{{{}}}", self.queue_id())
90+
}
91+
8792
// Redis key naming methods with proper multilane namespacing
8893
pub fn lanes_zset_name(&self) -> String {
89-
format!("twmq_multilane:{}:lanes", self.queue_id)
94+
format!("twmq_multilane:{}:lanes", self.redis_hash_tag())
9095
}
9196

9297
pub fn lane_pending_list_name(&self, lane_id: &str) -> String {
93-
format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id)
98+
format!(
99+
"twmq_multilane:{}:lane:{}:pending",
100+
self.redis_hash_tag(),
101+
lane_id
102+
)
94103
}
95104

96105
pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String {
97-
format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id)
106+
format!(
107+
"twmq_multilane:{}:lane:{}:delayed",
108+
self.redis_hash_tag(),
109+
lane_id
110+
)
98111
}
99112

100113
pub fn lane_active_hash_name(&self, lane_id: &str) -> String {
101-
format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id)
114+
format!(
115+
"twmq_multilane:{}:lane:{}:active",
116+
self.redis_hash_tag(),
117+
lane_id
118+
)
102119
}
103120

104121
pub fn success_list_name(&self) -> String {
105-
format!("twmq_multilane:{}:success", self.queue_id)
122+
format!("twmq_multilane:{}:success", self.redis_hash_tag())
106123
}
107124

108125
pub fn failed_list_name(&self) -> String {
109-
format!("twmq_multilane:{}:failed", self.queue_id)
126+
format!("twmq_multilane:{}:failed", self.redis_hash_tag())
110127
}
111128

112129
pub fn job_data_hash_name(&self) -> String {
113-
format!("twmq_multilane:{}:jobs:data", self.queue_id)
130+
format!("twmq_multilane:{}:jobs:data", self.redis_hash_tag())
114131
}
115132

116133
pub fn job_meta_hash_name(&self, job_id: &str) -> String {
117-
format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id)
134+
format!(
135+
"twmq_multilane:{}:job:{}:meta",
136+
self.redis_hash_tag(),
137+
job_id
138+
)
118139
}
119140

120141
pub fn job_errors_list_name(&self, job_id: &str) -> String {
121-
format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id)
142+
format!(
143+
"twmq_multilane:{}:job:{}:errors",
144+
self.redis_hash_tag(),
145+
job_id
146+
)
122147
}
123148

124149
pub fn job_result_hash_name(&self) -> String {
125-
format!("twmq_multilane:{}:jobs:result", self.queue_id)
150+
format!("twmq_multilane:{}:jobs:result", self.redis_hash_tag())
126151
}
127152

128153
pub fn dedupe_set_name(&self) -> String {
129-
format!("twmq_multilane:{}:dedup", self.queue_id)
154+
format!("twmq_multilane:{}:dedup", self.redis_hash_tag())
130155
}
131156

132157
pub fn pending_cancellation_set_name(&self) -> String {
133-
format!("twmq_multilane:{}:pending_cancellations", self.queue_id)
158+
format!(
159+
"twmq_multilane:{}:pending_cancellations",
160+
self.redis_hash_tag()
161+
)
134162
}
135163

136164
pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String {
137165
format!(
138166
"twmq_multilane:{}:job:{}:lease:{}",
139-
self.queue_id, job_id, lease_token
167+
self.redis_hash_tag(),
168+
job_id,
169+
lease_token
140170
)
141171
}
142172

@@ -229,7 +259,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
229259
.key(self.job_data_hash_name())
230260
.key(self.job_meta_hash_name(&job.id))
231261
.key(self.dedupe_set_name())
232-
.arg(&self.queue_id)
262+
// Redis Cluster: ensure constructed keys match hash-tagged names
263+
.arg(self.redis_hash_tag())
233264
.arg(lane_id)
234265
.arg(&job_options.id)
235266
.arg(job_data)
@@ -414,7 +445,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
414445
.key(self.pending_cancellation_set_name())
415446
.key(self.job_meta_hash_name(job_id))
416447
.key(self.job_data_hash_name())
417-
.arg(&self.queue_id)
448+
// Redis Cluster: ensure constructed keys match hash-tagged names
449+
.arg(self.redis_hash_tag())
418450
.arg(job_id)
419451
.arg(now)
420452
.invoke_async(&mut self.redis.clone())
@@ -760,7 +792,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
760792
.key(self.pending_cancellation_set_name())
761793
.key(self.failed_list_name())
762794
.key(self.success_list_name())
763-
.arg(&self.queue_id)
795+
// Redis Cluster: ensure constructed keys match hash-tagged names
796+
.arg(self.redis_hash_tag())
764797
.arg(now)
765798
.arg(batch_size)
766799
.arg(self.options.lease_duration.as_secs())

twmq/tests/basic.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/";
1818
// Helper to clean up Redis keys for a given queue name pattern
1919
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
2020
let mut conn = conn_manager.clone();
21-
let keys_pattern = format!("twmq:{queue_name}:*");
21+
// twmq queue keys are hash-tagged for Redis Cluster compatibility
22+
let keys_pattern = format!("twmq:{{{queue_name}}}:*");
2223

2324
let keys: Vec<String> = redis::cmd("KEYS")
2425
.arg(&keys_pattern)

twmq/tests/basic_hook.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use twmq::{
2323
// Helper to clean up Redis keys for a given queue name pattern
2424
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
2525
let mut conn = conn_manager.clone();
26-
let keys_pattern = format!("twmq:{queue_name}:*");
26+
// twmq queue keys are hash-tagged for Redis Cluster compatibility
27+
let keys_pattern = format!("twmq:{{{queue_name}}}:*");
2728

2829
let keys: Vec<String> = redis::cmd("KEYS")
2930
.arg(&keys_pattern)

0 commit comments

Comments
 (0)