@@ -197,7 +197,7 @@ def on_disconnect(self):
197
197
def on_connect (self , connection : "Connection" ):
198
198
raise NotImplementedError ()
199
199
200
- async def can_read (self , timeout : float ) -> bool :
200
+ async def can_read_destructive (self ) -> bool :
201
201
raise NotImplementedError ()
202
202
203
203
async def read_response (
@@ -275,9 +275,9 @@ async def _read_from_socket(
275
275
return False
276
276
raise ConnectionError (f"Error while reading from socket: { ex .args } " )
277
277
278
- async def can_read (self , timeout : float ) -> bool :
278
+ async def can_read_destructive (self ) -> bool :
279
279
return bool (self .length ) or await self ._read_from_socket (
280
- timeout = timeout , raise_on_timeout = False
280
+ timeout = 0 , raise_on_timeout = False
281
281
)
282
282
283
283
async def read (self , length : int ) -> bytes :
@@ -375,8 +375,8 @@ def on_disconnect(self):
375
375
self ._buffer = None
376
376
self .encoder = None
377
377
378
- async def can_read (self , timeout : float ):
379
- return self ._buffer and bool (await self ._buffer .can_read ( timeout ))
378
+ async def can_read_destructive (self ):
379
+ return self ._buffer and bool (await self ._buffer .can_read_destructive ( ))
380
380
381
381
async def read_response (
382
382
self , disable_decoding : bool = False
@@ -433,9 +433,7 @@ async def read_response(
433
433
class HiredisParser (BaseParser ):
434
434
"""Parser class for connections using Hiredis"""
435
435
436
- __slots__ = BaseParser .__slots__ + ("_next_response" , "_reader" , "_socket_timeout" )
437
-
438
- _next_response : bool
436
+ __slots__ = BaseParser .__slots__ + ("_reader" , "_socket_timeout" )
439
437
440
438
def __init__ (self , socket_read_size : int ):
441
439
if not HIREDIS_AVAILABLE :
@@ -455,23 +453,18 @@ def on_connect(self, connection: "Connection"):
455
453
kwargs ["errors" ] = connection .encoder .encoding_errors
456
454
457
455
self ._reader = hiredis .Reader (** kwargs )
458
- self ._next_response = False
459
456
self ._socket_timeout = connection .socket_timeout
460
457
461
458
def on_disconnect (self ):
462
459
self ._stream = None
463
460
self ._reader = None
464
- self ._next_response = False
465
461
466
- async def can_read (self , timeout : float ):
462
+ async def can_read_destructive (self ):
467
463
if not self ._stream or not self ._reader :
468
464
raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
469
-
470
- if self ._next_response is False :
471
- self ._next_response = self ._reader .gets ()
472
- if self ._next_response is False :
473
- return await self .read_from_socket (timeout = timeout , raise_on_timeout = False )
474
- return True
465
+ if self ._reader .gets ():
466
+ return True
467
+ return await self .read_from_socket (timeout = 0 , raise_on_timeout = False )
475
468
476
469
async def read_from_socket (
477
470
self ,
@@ -514,12 +507,6 @@ async def read_response(
514
507
self .on_disconnect ()
515
508
raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR ) from None
516
509
517
- # _next_response might be cached from a can_read() call
518
- if self ._next_response is not False :
519
- response = self ._next_response
520
- self ._next_response = False
521
- return response
522
-
523
510
response = self ._reader .gets ()
524
511
while response is False :
525
512
await self .read_from_socket ()
@@ -916,12 +903,10 @@ async def send_command(self, *args: Any, **kwargs: Any) -> None:
916
903
self .pack_command (* args ), check_health = kwargs .get ("check_health" , True )
917
904
)
918
905
919
- async def can_read (self , timeout : float = 0 ):
906
+ async def can_read_destructive (self ):
920
907
"""Poll the socket to see if there's data that can be read."""
921
- if not self .is_connected :
922
- await self .connect ()
923
908
try :
924
- return await self ._parser .can_read ( timeout )
909
+ return await self ._parser .can_read_destructive ( )
925
910
except OSError as e :
926
911
await self .disconnect ()
927
912
raise ConnectionError (
@@ -1523,12 +1508,12 @@ async def get_connection(self, command_name, *keys, **options):
1523
1508
# pool before all data has been read or the socket has been
1524
1509
# closed. either way, reconnect and verify everything is good.
1525
1510
try :
1526
- if await connection .can_read ():
1511
+ if await connection .can_read_destructive ():
1527
1512
raise ConnectionError ("Connection has data" ) from None
1528
1513
except ConnectionError :
1529
1514
await connection .disconnect ()
1530
1515
await connection .connect ()
1531
- if await connection .can_read ():
1516
+ if await connection .can_read_destructive ():
1532
1517
raise ConnectionError ("Connection not ready" ) from None
1533
1518
except BaseException :
1534
1519
# release the connection back to the pool so that we don't
@@ -1724,12 +1709,12 @@ async def get_connection(self, command_name, *keys, **options):
1724
1709
# pool before all data has been read or the socket has been
1725
1710
# closed. either way, reconnect and verify everything is good.
1726
1711
try :
1727
- if await connection .can_read ():
1712
+ if await connection .can_read_destructive ():
1728
1713
raise ConnectionError ("Connection has data" ) from None
1729
1714
except ConnectionError :
1730
1715
await connection .disconnect ()
1731
1716
await connection .connect ()
1732
- if await connection .can_read ():
1717
+ if await connection .can_read_destructive ():
1733
1718
raise ConnectionError ("Connection not ready" ) from None
1734
1719
except BaseException :
1735
1720
# release the connection back to the pool so that we don't leak it
0 commit comments