@@ -8,6 +8,8 @@ const channelBus: Map<string, Set<Redis>> = new Map();
8
8
/**
9
9
* IORedis-compatible adapter backed by Upstash Redis REST client.
10
10
* Implements the subset of the API used by Bottleneck's IORedis store.
11
+ *
12
+ * Uses Proxy to forward all unknown methods to the underlying Upstash client.
11
13
*/
12
14
export class Redis {
13
15
private client : UpstashRedis ;
@@ -39,6 +41,29 @@ export class Redis {
39
41
40
42
// Emit ready asynchronously to mimic ioredis behavior
41
43
queueMicrotask ( ( ) => this . emit ( "ready" ) ) ;
44
+
45
+ // Return a Proxy that forwards unknown method calls to the underlying Upstash client
46
+ // This allows Redis commands like get, set, incr, etc. to work transparently
47
+ return new Proxy ( this , {
48
+ get ( target , prop , receiver ) {
49
+ // First check if the property exists on the Redis wrapper class
50
+ const targetValue = Reflect . get ( target , prop , receiver ) ;
51
+ if ( targetValue !== undefined ) {
52
+ return targetValue ;
53
+ }
54
+
55
+ // Then check the underlying Upstash client
56
+ const clientAsAny = target . client as unknown as Record < string , unknown > ;
57
+ const clientMethod = clientAsAny [ String ( prop ) ] ;
58
+
59
+ if ( typeof clientMethod === "function" ) {
60
+ // Bind the method to the client so 'this' works correctly
61
+ return clientMethod . bind ( target . client ) ;
62
+ }
63
+
64
+ return undefined ;
65
+ }
66
+ } ) ;
42
67
}
43
68
44
69
// EventEmitter-like API expected by bottleneck
@@ -74,7 +99,12 @@ export class Redis {
74
99
75
100
// ioredis API: duplicate returns a new connection with same options
76
101
duplicate ( ) {
77
- return new Redis ( this . initOptions ) ;
102
+ const newRedis = new Redis ( this . initOptions ) ;
103
+ // Copy defined scripts to the new instance
104
+ for ( const [ name , script ] of this . scripts . entries ( ) ) {
105
+ newRedis . defineCommand ( name , { lua : script } ) ;
106
+ }
107
+ return newRedis ;
78
108
}
79
109
80
110
// ioredis API: defineCommand(name, { lua }) registers a script callable as client[name](...)
@@ -93,13 +123,33 @@ export class Redis {
93
123
94
124
try {
95
125
if ( Deno . env . get ( "REDIS_DEBUG" ) === "true" ) {
96
- console . log ( "eval script" , { name, numKeys, keysCount : keys . length , argvCount : argv . length } ) ;
126
+ console . log ( "eval script" , {
127
+ name,
128
+ numKeys,
129
+ keysCount : keys . length ,
130
+ argvCount : argv . length ,
131
+ rawArgsCount : args . length ,
132
+ firstKey : keys [ 0 ] ,
133
+ firstArg : argv [ 0 ] ,
134
+ allKeys : keys ,
135
+ scriptLength : script . length
136
+ } ) ;
137
+ }
138
+
139
+ // Check if Upstash client has eval method
140
+ const clientAsAny = this . client as unknown as Record < string , unknown > ;
141
+ if ( typeof clientAsAny . eval !== "function" ) {
142
+ throw new Error ( "Upstash Redis client does not support EVAL command" ) ;
97
143
}
144
+
98
145
const result = await (
99
146
this . client as unknown as {
100
147
eval : ( script : string , keys : string [ ] , args : ( string | number ) [ ] ) => Promise < unknown > ;
101
148
}
102
149
) . eval ( script , keys , argv ) ;
150
+ if ( Deno . env . get ( "REDIS_DEBUG" ) === "true" ) {
151
+ console . log ( "eval script result" , { name, result } ) ;
152
+ }
103
153
if ( cb ) cb ( null , result ) ;
104
154
return result ;
105
155
} catch ( error ) {
@@ -139,6 +189,8 @@ export class Redis {
139
189
// ioredis API: pipeline([...]).exec() => [[err, result], ...]
140
190
pipeline ( commands : Array < [ string , ...unknown [ ] ] > ) {
141
191
const emitError = this . emit . bind ( this ) ;
192
+ const executeCommand = this . executeCommand . bind ( this ) ;
193
+ const scripts = this . scripts ; // Capture for closure
142
194
143
195
// Capture credentials at pipeline creation time
144
196
const url = String (
@@ -158,15 +210,42 @@ export class Redis {
158
210
return out ;
159
211
}
160
212
161
- try {
162
- // Build 2D JSON array of commands for Upstash pipeline
163
- const pipelineCommands = commands . map ( ( cmd ) => cmd ) ;
213
+ if ( Deno . env . get ( "REDIS_DEBUG" ) === "true" ) {
214
+ console . log ( "pipeline exec" , {
215
+ commandCount : commands . length ,
216
+ commands : commands . map ( ( cmd ) => cmd [ 0 ] )
217
+ } ) ;
218
+ }
164
219
165
- if ( Deno . env . get ( "REDIS_DEBUG" ) === "true" ) {
166
- console . log ( "pipeline exec" , { commandCount : commands . length , commands : commands . map ( ( cmd ) => cmd [ 0 ] ) } ) ;
220
+ // Check if ANY commands are custom scripts (defined via defineCommand)
221
+ const hasCustomScripts = commands . some ( cmd => {
222
+ const commandName = String ( cmd [ 0 ] ) ;
223
+ return scripts . has ( commandName ) ;
224
+ } ) ;
225
+
226
+ if ( Deno . env . get ( "REDIS_DEBUG" ) === "true" ) {
227
+ console . log ( ` pipeline hasCustomScripts: ${ hasCustomScripts } , executing ${ hasCustomScripts ? 'SEQUENTIALLY' : 'BATCHED' } ` ) ;
228
+ }
229
+
230
+ // If there are custom scripts, we must execute sequentially because
231
+ // Upstash pipeline doesn't support our defineCommand Lua scripts
232
+ if ( hasCustomScripts ) {
233
+ for ( const cmd of commands ) {
234
+ try {
235
+ const result = await executeCommand ( cmd ) ;
236
+ out . push ( [ null , result ] ) ;
237
+ } catch ( err ) {
238
+ out . push ( [ err , null ] ) ;
239
+ emitError ( "error" , err ) ;
240
+ }
167
241
}
242
+ return out ;
243
+ }
244
+
245
+ // No custom scripts - use Upstash's atomic pipeline endpoint
246
+ try {
247
+ const pipelineCommands = commands . map ( ( cmd ) => cmd ) ;
168
248
169
- // Make single REST POST to Upstash pipeline endpoint
170
249
const pipelineUrl = `${ url } /pipeline` ;
171
250
const response = await fetch ( pipelineUrl , {
172
251
method : "POST" ,
@@ -183,25 +262,19 @@ export class Redis {
183
262
184
263
const results = ( await response . json ( ) ) as Array < { result ?: unknown ; error ?: string } > ;
185
264
186
- // Map pipeline results to expected ioredis format: [err, result] or [null, result]
187
- // Upstash returns: { result: ... } on success or { error: ... } on error
188
265
for ( let i = 0 ; i < commands . length ; i ++ ) {
189
266
const resp = results [ i ] ;
190
267
191
268
if ( resp && typeof resp === "object" && "error" in resp ) {
192
- // Error case: { error: "..." }
193
269
out . push ( [ resp . error , null ] ) ;
194
270
emitError ( "error" , resp . error ) ;
195
271
} else if ( resp && typeof resp === "object" && "result" in resp ) {
196
- // Success case: { result: ... }
197
272
out . push ( [ null , resp . result ] ) ;
198
273
} else {
199
- // Unexpected format - treat as success with raw value for backwards compatibility
200
274
out . push ( [ null , resp ] ) ;
201
275
}
202
276
}
203
277
} catch ( err ) {
204
- // If pipeline request fails entirely, treat all commands as failed
205
278
for ( let i = 0 ; i < commands . length ; i ++ ) {
206
279
out . push ( [ err , null ] ) ;
207
280
emitError ( "error" , err ) ;
0 commit comments