@@ -168,34 +168,66 @@ async def test_standalone_pipeline(delay, redis_addr):
168
168
@pytest .mark .onlycluster
169
169
async def test_cluster (request , redis_addr ):
170
170
171
- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
172
- # it will re-connect to the nodes as advertised by the cluster, bypassing
173
- # the single DelayProxy we set up.
174
- # to work around this, we really would nedd a port-remapper for the RedisCluster
171
+ delay = 0.1
172
+ cluster_port = 6372
173
+ remap_base = 7372
174
+ n_nodes = 6
175
+
176
+ remap = []
177
+ proxies = []
178
+ for i in range (n_nodes ):
179
+ port = cluster_port + i
180
+ remapped = remap_base + i
181
+ remap .append ({"from_port" : port , "to_port" : remapped })
182
+ forward_addr = redis_addr [0 ], port
183
+ proxy = DelayProxy (
184
+ addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = delay
185
+ )
186
+ proxies .append (proxy )
175
187
176
- redis_addr = redis_addr [0 ], 6372 # use the cluster port
177
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr , delay = 0.1 )
178
- await dp .start ()
188
+ # start proxies
189
+ await asyncio .gather (* [p .start () for p in proxies ])
190
+
191
+ def all_clear ():
192
+ for p in proxies :
193
+ p .send_event .clear ()
179
194
180
- r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
181
- await r .initialize ()
182
- with dp .override ():
195
+ async def wait_for_send ():
196
+ asyncio .wait (
197
+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
198
+ )
199
+
200
+ @contextlib .contextmanager
201
+ def override ():
202
+ with contextlib .ExitStack () as stack :
203
+ for p in proxies :
204
+ stack .enter_context (p .override ())
205
+ yield
206
+
207
+ with override ():
208
+ r = RedisCluster .from_url (
209
+ f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap
210
+ )
211
+ await r .initialize ()
183
212
await r .set ("foo" , "foo" )
184
213
await r .set ("bar" , "bar" )
185
214
186
- dp . send_event . clear ()
215
+ all_clear ()
187
216
t = asyncio .create_task (r .get ("foo" ))
188
- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
189
- await asyncio .sleep (0.05 )
217
+ # cannot wait on the send event, we don't know which node will be used
218
+ await wait_for_send ()
219
+ await asyncio .sleep (delay )
190
220
t .cancel ()
191
- try :
221
+ with pytest . raises ( asyncio . CancelledError ) :
192
222
await t
193
- except asyncio .CancelledError :
194
- pass
195
223
196
- with dp .override ():
197
- assert await r .get ("bar" ) == b"bar"
198
- assert await r .ping ()
199
- assert await r .get ("foo" ) == b"foo"
224
+ with override ():
225
+ # try a number of requests to excercise all the connections
226
+ async def doit ():
227
+ assert await r .get ("bar" ) == b"bar"
228
+ assert await r .ping ()
229
+ assert await r .get ("foo" ) == b"foo"
200
230
201
- await dp .stop ()
231
+ await asyncio .gather (* [doit () for _ in range (10 )])
232
+
233
+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments