@@ -817,14 +817,16 @@ async def mysetup(self, r, method):
817
817
"type" : "subscribe" ,
818
818
}
819
819
820
- async def mycleanup (self ):
820
+ async def myfinish (self ):
821
821
message = await self .messages .get ()
822
822
assert message == {
823
823
"channel" : b"foo" ,
824
824
"data" : 1 ,
825
825
"pattern" : None ,
826
826
"type" : "subscribe" ,
827
827
}
828
+
829
+ async def mykill (self ):
828
830
# kill thread
829
831
async with self .cond :
830
832
self .state = 4 # quit
@@ -834,41 +836,52 @@ async def test_reconnect_socket_error(self, r: redis.Redis, method):
834
836
"""
835
837
Test that a socket error will cause reconnect
836
838
"""
837
- async with async_timeout .timeout (self .timeout ):
838
- await self .mysetup (r , method )
839
- # now, disconnect the connection, and wait for it to be re-established
840
- async with self .cond :
841
- assert self .state == 0
842
- self .state = 1
843
- with mock .patch .object (self .pubsub .connection , "_parser" ) as mockobj :
844
- mockobj .read_response .side_effect = socket .error
845
- mockobj .can_read .side_effect = socket .error
846
- # wait until task noticies the disconnect until we undo the patch
847
- await self .cond .wait_for (lambda : self .state >= 2 )
848
- assert not self .pubsub .connection .is_connected
849
- # it is in a disconnecte state
850
- # wait for reconnect
851
- await self .cond .wait_for (lambda : self .pubsub .connection .is_connected )
852
- assert self .state == 3
853
-
854
- await self .mycleanup ()
839
+ try :
840
+ async with async_timeout .timeout (self .timeout ):
841
+ await self .mysetup (r , method )
842
+ # now, disconnect the connection, and wait for it to be re-established
843
+ async with self .cond :
844
+ assert self .state == 0
845
+ self .state = 1
846
+ with mock .patch .object (self .pubsub .connection , "_parser" ) as m :
847
+ m .read_response .side_effect = socket .error
848
+ m .can_read .side_effect = socket .error
849
+ # wait until task noticies the disconnect until we
850
+ # undo the patch
851
+ await self .cond .wait_for (lambda : self .state >= 2 )
852
+ assert not self .pubsub .connection .is_connected
853
+ # it is in a disconnecte state
854
+ # wait for reconnect
855
+ await self .cond .wait_for (
856
+ lambda : self .pubsub .connection .is_connected
857
+ )
858
+ assert self .state == 3
859
+
860
+ await self .myfinish ()
861
+ finally :
862
+ await self .mykill ()
855
863
856
864
async def test_reconnect_disconnect (self , r : redis .Redis , method ):
857
865
"""
858
866
Test that a manual disconnect() will cause reconnect
859
867
"""
860
- async with async_timeout .timeout (self .timeout ):
861
- await self .mysetup (r , method )
862
- # now, disconnect the connection, and wait for it to be re-established
863
- async with self .cond :
864
- self .state = 1
865
- await self .pubsub .connection .disconnect ()
866
- assert not self .pubsub .connection .is_connected
867
- # wait for reconnect
868
- await self .cond .wait_for (lambda : self .pubsub .connection .is_connected )
869
- assert self .state == 3
870
-
871
- await self .mycleanup ()
868
+ try :
869
+ async with async_timeout .timeout (self .timeout ):
870
+ await self .mysetup (r , method )
871
+ # now, disconnect the connection, and wait for it to be re-established
872
+ async with self .cond :
873
+ self .state = 1
874
+ await self .pubsub .connection .disconnect ()
875
+ assert not self .pubsub .connection .is_connected
876
+ # wait for reconnect
877
+ await self .cond .wait_for (
878
+ lambda : self .pubsub .connection .is_connected
879
+ )
880
+ assert self .state == 3
881
+
882
+ await self .myfinish ()
883
+ finally :
884
+ await self .mykill ()
872
885
873
886
async def loop (self ):
874
887
# reader loop, performing state transitions as it
0 commit comments