Skip to content

Commit c0833f6

Browse files
Optionally disable disconnects in read_response (redis#2695)
* Add regression tests and fixes for issue redis#1128 * Fix tests for resumable read_response to use "disconnect_on_error" * undo prevision fix attempts in async client and cluster * re-enable cluster test * Suggestions from code review * Add CHANGES
1 parent 093232d commit c0833f6

11 files changed

+149
-110
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Revert #2104, #2673, add `disconnect_on_error` option to `read_response()` (issues #2506, #2624)
12
* Add `address_remap` parameter to `RedisCluster`
23
* Fix incorrect usage of once flag in async Sentinel
34
* asyncio: Fix memory leak caused by hiredis (#2693)

redis/asyncio/client.py

Lines changed: 27 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -500,23 +500,6 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
500500
):
501501
raise error
502502

503-
async def _try_send_command_parse_response(self, conn, *args, **options):
504-
try:
505-
return await conn.retry.call_with_retry(
506-
lambda: self._send_command_parse_response(
507-
conn, args[0], *args, **options
508-
),
509-
lambda error: self._disconnect_raise(conn, error),
510-
)
511-
except asyncio.CancelledError:
512-
await conn.disconnect(nowait=True)
513-
raise
514-
finally:
515-
if self.single_connection_client:
516-
self._single_conn_lock.release()
517-
if not self.connection:
518-
await self.connection_pool.release(conn)
519-
520503
# COMMAND EXECUTION AND PROTOCOL PARSING
521504
async def execute_command(self, *args, **options):
522505
"""Execute a command and return a parsed response"""
@@ -527,10 +510,18 @@ async def execute_command(self, *args, **options):
527510

528511
if self.single_connection_client:
529512
await self._single_conn_lock.acquire()
530-
531-
return await asyncio.shield(
532-
self._try_send_command_parse_response(conn, *args, **options)
533-
)
513+
try:
514+
return await conn.retry.call_with_retry(
515+
lambda: self._send_command_parse_response(
516+
conn, command_name, *args, **options
517+
),
518+
lambda error: self._disconnect_raise(conn, error),
519+
)
520+
finally:
521+
if self.single_connection_client:
522+
self._single_conn_lock.release()
523+
if not self.connection:
524+
await pool.release(conn)
534525

535526
async def parse_response(
536527
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -774,18 +765,10 @@ async def _disconnect_raise_connect(self, conn, error):
774765
is not a TimeoutError. Otherwise, try to reconnect
775766
"""
776767
await conn.disconnect()
777-
778768
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
779769
raise error
780770
await conn.connect()
781771

782-
async def _try_execute(self, conn, command, *arg, **kwargs):
783-
try:
784-
return await command(*arg, **kwargs)
785-
except asyncio.CancelledError:
786-
await conn.disconnect()
787-
raise
788-
789772
async def _execute(self, conn, command, *args, **kwargs):
790773
"""
791774
Connect manually upon disconnection. If the Redis server is down,
@@ -794,11 +777,9 @@ async def _execute(self, conn, command, *args, **kwargs):
794777
called by the # connection to resubscribe us to any channels and
795778
patterns we were previously listening to
796779
"""
797-
return await asyncio.shield(
798-
conn.retry.call_with_retry(
799-
lambda: self._try_execute(conn, command, *args, **kwargs),
800-
lambda error: self._disconnect_raise_connect(conn, error),
801-
)
780+
return await conn.retry.call_with_retry(
781+
lambda: command(*args, **kwargs),
782+
lambda error: self._disconnect_raise_connect(conn, error),
802783
)
803784

804785
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -816,7 +797,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
816797
await conn.connect()
817798

818799
read_timeout = None if block else timeout
819-
response = await self._execute(conn, conn.read_response, timeout=read_timeout)
800+
response = await self._execute(
801+
conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
802+
)
820803

821804
if conn.health_check_interval and response == self.health_check_response:
822805
# ignore the health check message as user might not expect it
@@ -1200,18 +1183,6 @@ async def _disconnect_reset_raise(self, conn, error):
12001183
await self.reset()
12011184
raise
12021185

1203-
async def _try_send_command_parse_response(self, conn, *args, **options):
1204-
try:
1205-
return await conn.retry.call_with_retry(
1206-
lambda: self._send_command_parse_response(
1207-
conn, args[0], *args, **options
1208-
),
1209-
lambda error: self._disconnect_reset_raise(conn, error),
1210-
)
1211-
except asyncio.CancelledError:
1212-
await conn.disconnect()
1213-
raise
1214-
12151186
async def immediate_execute_command(self, *args, **options):
12161187
"""
12171188
Execute a command immediately, but don't auto-retry on a
@@ -1227,8 +1198,12 @@ async def immediate_execute_command(self, *args, **options):
12271198
command_name, self.shard_hint
12281199
)
12291200
self.connection = conn
1230-
return await asyncio.shield(
1231-
self._try_send_command_parse_response(conn, *args, **options)
1201+
1202+
return await conn.retry.call_with_retry(
1203+
lambda: self._send_command_parse_response(
1204+
conn, command_name, *args, **options
1205+
),
1206+
lambda error: self._disconnect_reset_raise(conn, error),
12321207
)
12331208

12341209
def pipeline_execute_command(self, *args, **options):
@@ -1396,19 +1371,6 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
13961371
await self.reset()
13971372
raise
13981373

1399-
async def _try_execute(self, conn, execute, stack, raise_on_error):
1400-
try:
1401-
return await conn.retry.call_with_retry(
1402-
lambda: execute(conn, stack, raise_on_error),
1403-
lambda error: self._disconnect_raise_reset(conn, error),
1404-
)
1405-
except asyncio.CancelledError:
1406-
# not supposed to be possible, yet here we are
1407-
await conn.disconnect(nowait=True)
1408-
raise
1409-
finally:
1410-
await self.reset()
1411-
14121374
async def execute(self, raise_on_error: bool = True):
14131375
"""Execute all the commands in the current pipeline"""
14141376
stack = self.command_stack
@@ -1430,11 +1392,10 @@ async def execute(self, raise_on_error: bool = True):
14301392
conn = cast(Connection, conn)
14311393

14321394
try:
1433-
return await asyncio.shield(
1434-
self._try_execute(conn, execute, stack, raise_on_error)
1395+
return await conn.retry.call_with_retry(
1396+
lambda: execute(conn, stack, raise_on_error),
1397+
lambda error: self._disconnect_raise_reset(conn, error),
14351398
)
1436-
except RuntimeError:
1437-
await self.reset()
14381399
finally:
14391400
await self.reset()
14401401

redis/asyncio/cluster.py

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,33 +1016,12 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
10161016
await connection.send_packed_command(connection.pack_command(*args), False)
10171017

10181018
# Read response
1019-
return await asyncio.shield(
1020-
self._parse_and_release(connection, args[0], **kwargs)
1021-
)
1022-
1023-
async def _parse_and_release(self, connection, *args, **kwargs):
10241019
try:
1025-
return await self.parse_response(connection, *args, **kwargs)
1026-
except asyncio.CancelledError:
1027-
# should not be possible
1028-
await connection.disconnect(nowait=True)
1029-
raise
1020+
return await self.parse_response(connection, args[0], **kwargs)
10301021
finally:
1022+
# Release connection
10311023
self._free.append(connection)
10321024

1033-
async def _try_parse_response(self, cmd, connection, ret):
1034-
try:
1035-
cmd.result = await asyncio.shield(
1036-
self.parse_response(connection, cmd.args[0], **cmd.kwargs)
1037-
)
1038-
except asyncio.CancelledError:
1039-
await connection.disconnect(nowait=True)
1040-
raise
1041-
except Exception as e:
1042-
cmd.result = e
1043-
ret = True
1044-
return ret
1045-
10461025
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10471026
# Acquire connection
10481027
connection = self.acquire_connection()
@@ -1055,7 +1034,13 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10551034
# Read responses
10561035
ret = False
10571036
for cmd in commands:
1058-
ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
1037+
try:
1038+
cmd.result = await self.parse_response(
1039+
connection, cmd.args[0], **cmd.kwargs
1040+
)
1041+
except Exception as e:
1042+
cmd.result = e
1043+
ret = True
10591044

10601045
# Release connection
10611046
self._free.append(connection)

redis/asyncio/connection.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,11 @@ async def send_packed_command(
804804
raise ConnectionError(
805805
f"Error {err_no} while writing to socket. {errmsg}."
806806
) from e
807-
except Exception:
807+
except BaseException:
808+
# BaseExceptions can be raised when a socket send operation is not
809+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
810+
# to send un-sent data. However, the send_packed_command() API
811+
# does not support it so there is no point in keeping the connection open.
808812
await self.disconnect(nowait=True)
809813
raise
810814

@@ -828,6 +832,8 @@ async def read_response(
828832
self,
829833
disable_decoding: bool = False,
830834
timeout: Optional[float] = None,
835+
*,
836+
disconnect_on_error: bool = True,
831837
):
832838
"""Read the response from a previously sent command"""
833839
read_timeout = timeout if timeout is not None else self.socket_timeout
@@ -843,22 +849,24 @@ async def read_response(
843849
)
844850
except asyncio.TimeoutError:
845851
if timeout is not None:
846-
# user requested timeout, return None
852+
# user requested timeout, return None. Operation can be retried
847853
return None
848854
# it was a self.socket_timeout error.
849-
await self.disconnect(nowait=True)
855+
if disconnect_on_error:
856+
await self.disconnect(nowait=True)
850857
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
851858
except OSError as e:
852-
await self.disconnect(nowait=True)
859+
if disconnect_on_error:
860+
await self.disconnect(nowait=True)
853861
raise ConnectionError(
854862
f"Error while reading from {self.host}:{self.port} : {e.args}"
855863
)
856-
except asyncio.CancelledError:
857-
# need this check for 3.7, where CancelledError
858-
# is subclass of Exception, not BaseException
859-
raise
860-
except Exception:
861-
await self.disconnect(nowait=True)
864+
except BaseException:
865+
# Also by default close in case of BaseException. A lot of code
866+
# relies on this behaviour when doing Command/Response pairs.
867+
# See #1128.
868+
if disconnect_on_error:
869+
await self.disconnect(nowait=True)
862870
raise
863871

864872
if self.health_check_interval:

redis/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1533,7 +1533,7 @@ def try_read():
15331533
return None
15341534
else:
15351535
conn.connect()
1536-
return conn.read_response()
1536+
return conn.read_response(disconnect_on_error=False)
15371537

15381538
response = self._execute(conn, try_read)
15391539

redis/connection.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -834,7 +834,11 @@ def send_packed_command(self, command, check_health=True):
834834
errno = e.args[0]
835835
errmsg = e.args[1]
836836
raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.")
837-
except Exception:
837+
except BaseException:
838+
# BaseExceptions can be raised when a socket send operation is not
839+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
840+
# to send un-sent data. However, the send_packed_command() API
841+
# does not support it so there is no point in keeping the connection open.
838842
self.disconnect()
839843
raise
840844

@@ -859,23 +863,31 @@ def can_read(self, timeout=0):
859863
self.disconnect()
860864
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")
861865

862-
def read_response(self, disable_decoding=False):
866+
def read_response(
867+
self, disable_decoding=False, *, disconnect_on_error: bool = True
868+
):
863869
"""Read the response from a previously sent command"""
864870

865871
host_error = self._host_error()
866872

867873
try:
868874
response = self._parser.read_response(disable_decoding=disable_decoding)
869875
except socket.timeout:
870-
self.disconnect()
876+
if disconnect_on_error:
877+
self.disconnect()
871878
raise TimeoutError(f"Timeout reading from {host_error}")
872879
except OSError as e:
873-
self.disconnect()
880+
if disconnect_on_error:
881+
self.disconnect()
874882
raise ConnectionError(
875883
f"Error while reading from {host_error}" f" : {e.args}"
876884
)
877-
except Exception:
878-
self.disconnect()
885+
except BaseException:
886+
# Also by default close in case of BaseException. A lot of code
887+
# relies on this behaviour when doing Command/Response pairs.
888+
# See #1128.
889+
if disconnect_on_error:
890+
self.disconnect()
879891
raise
880892

881893
if self.health_check_interval:

tests/test_asyncio/test_commands.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""
22
Tests async overrides of commands from their mixins
33
"""
4+
import asyncio
45
import binascii
56
import datetime
67
import re
8+
import sys
79
from string import ascii_letters
810

911
import pytest
@@ -18,6 +20,11 @@
1820
skip_unless_arch_bits,
1921
)
2022

23+
if sys.version_info >= (3, 11, 3):
24+
from asyncio import timeout as async_timeout
25+
else:
26+
from async_timeout import timeout as async_timeout
27+
2128
REDIS_6_VERSION = "5.9.0"
2229

2330

@@ -3008,6 +3015,37 @@ async def test_module_list(self, r: redis.Redis):
30083015
for x in await r.module_list():
30093016
assert isinstance(x, dict)
30103017

3018+
@pytest.mark.onlynoncluster
3019+
async def test_interrupted_command(self, r: redis.Redis):
3020+
"""
3021+
Regression test for issue #1128: An Un-handled BaseException
3022+
will leave the socket with un-read response to a previous
3023+
command.
3024+
"""
3025+
ready = asyncio.Event()
3026+
3027+
async def helper():
3028+
with pytest.raises(asyncio.CancelledError):
3029+
# blocking pop
3030+
ready.set()
3031+
await r.brpop(["nonexist"])
3032+
# If the following is not done, further Timout operations will fail,
3033+
# because the timeout won't catch its Cancelled Error if the task
3034+
# has a pending cancel. Python documentation probably should reflect this.
3035+
if sys.version_info >= (3, 11):
3036+
asyncio.current_task().uncancel()
3037+
# if all is well, we can continue. The following should not hang.
3038+
await r.set("status", "down")
3039+
3040+
task = asyncio.create_task(helper())
3041+
await ready.wait()
3042+
await asyncio.sleep(0.01)
3043+
# the task is now sleeping, lets send it an exception
3044+
task.cancel()
3045+
# If all is well, the task should finish right away, otherwise fail with Timeout
3046+
async with async_timeout(0.1):
3047+
await task
3048+
30113049

30123050
@pytest.mark.onlynoncluster
30133051
class TestBinarySave:

0 commit comments

Comments
 (0)