Skip to content

Commit e3110b1

Browse files
committed
[GROW-3361] always release, instead of disconnect, when error occurs during get_connection (#11)
* add is_supported_error() to retry * release, instead of disconnect on any error, when fetching connections in cluster pipeline * add a default backoff after cluster pipeline disconnects its connections
1 parent 1207a77 commit e3110b1

File tree

4 files changed

+69
-12
lines changed

4 files changed

+69
-12
lines changed

redis/cluster.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,7 @@ def execute_command(self, *args, **kwargs):
11031103
# The nodes and slots cache were reinitialized.
11041104
# Try again with the new cluster setup.
11051105
retry_attempts -= 1
1106-
if self.retry and isinstance(e, self.retry._supported_errors):
1106+
if self.retry and self.retry.is_supported_error(e):
11071107
backoff = self.retry._backoff.compute(
11081108
self.cluster_error_retry_attempts - retry_attempts
11091109
)
@@ -2034,20 +2034,19 @@ def _send_cluster_commands(
20342034
redis_node = self.get_redis_connection(node)
20352035
try:
20362036
connection = get_connection(redis_node, c.args)
2037-
except (ConnectionError, TimeoutError) as e:
2037+
except BaseException as e:
20382038
for n in nodes.values():
20392039
n.connection_pool.release(n.connection)
20402040
n.connection = None
20412041
nodes = {}
2042-
if self.retry and isinstance(
2043-
e, self.retry._supported_errors
2044-
):
2042+
if self.retry and self.retry.is_supported_error(e):
20452043
backoff = self.retry._backoff.compute(attempts_count)
20462044
if backoff > 0:
20472045
time.sleep(backoff)
2048-
self.nodes_manager.initialize()
2049-
if is_default_node:
2050-
self.replace_default_node()
2046+
if isinstance(e, (ConnectionError, TimeoutError)):
2047+
self.nodes_manager.initialize()
2048+
if is_default_node:
2049+
self.replace_default_node()
20512050
raise
20522051
nodes[node_name] = NodeCommands(
20532052
redis_node.parse_response,
@@ -2163,6 +2162,8 @@ def _send_cluster_commands(
21632162
if n.connection:
21642163
n.connection.disconnect()
21652164
n.connection_pool.release(n.connection)
2165+
if len(nodes) > 0:
2166+
time.sleep(0.25)
21662167
raise
21672168

21682169
def _fail_on_redirect(self, allow_redirections):

redis/retry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list):
3232
set(self._supported_errors + tuple(specified_errors))
3333
)
3434

35+
def is_supported_error(self, error):
36+
return isinstance(error, self._supported_errors)
37+
3538
def call_with_retry(self, do, fail):
3639
"""
3740
Execute an operation that might fail and returns its result, or

tests/test_cluster.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,8 +2805,10 @@ def raise_error():
28052805

28062806
m.side_effect = raise_error
28072807

2808-
with pytest.raises(Exception, match="unexpected error"):
2809-
r.pipeline().get("a").execute()
2808+
with patch.object(Connection, "disconnect") as d:
2809+
with pytest.raises(Exception, match="unexpected error"):
2810+
r.pipeline().get("a").execute()
2811+
assert d.call_count == 1
28102812

28112813
for cluster_node in r.nodes_manager.nodes_cache.values():
28122814
connection_pool = cluster_node.redis_connection.connection_pool
@@ -3127,7 +3129,7 @@ def raise_ask_error():
31273129
assert res == ["MOCK_OK"]
31283130

31293131
@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
3130-
def test_return_previous_acquired_connections(self, r, error):
3132+
def test_return_previous_acquired_connections_with_retry(self, r, error):
31313133
# in order to ensure that a pipeline will make use of connections
31323134
# from different nodes
31333135
assert r.keyslot("a") != r.keyslot("b")
@@ -3143,7 +3145,13 @@ def raise_error(target_node, *args, **kwargs):
31433145

31443146
get_connection.side_effect = raise_error
31453147

3146-
r.pipeline().get("a").get("b").execute()
3148+
with patch.object(NodesManager, "initialize") as i:
3149+
# in order to remove disconnect caused by initialize
3150+
i.side_effect = lambda: None
3151+
3152+
with patch.object(Connection, "disconnect") as d:
3153+
r.pipeline().get("a").get("b").execute()
3154+
assert d.call_count == 0
31473155

31483156
# there should have been two get_connections per execution and
31493157
# two executions due to exception raised in the first execution
@@ -3153,6 +3161,39 @@ def raise_error(target_node, *args, **kwargs):
31533161
num_of_conns = len(connection_pool._available_connections)
31543162
assert num_of_conns == connection_pool._created_connections
31553163

3164+
@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
3165+
def test_return_previous_acquired_connections_without_retry(self, r, error):
3166+
# in order to ensure that a pipeline will make use of connections
3167+
# from different nodes
3168+
assert r.keyslot("a") != r.keyslot("b")
3169+
3170+
orig_func = redis.cluster.get_connection
3171+
with patch("redis.cluster.get_connection") as get_connection:
3172+
3173+
def raise_error(target_node, *args, **kwargs):
3174+
if get_connection.call_count == 2:
3175+
raise error("mocked error")
3176+
else:
3177+
return orig_func(target_node, *args, **kwargs)
3178+
3179+
get_connection.side_effect = raise_error
3180+
3181+
with patch.object(Connection, "disconnect") as d:
3182+
with pytest.raises(error):
3183+
r.pipeline().get("a").get("b").execute()
3184+
assert d.call_count == 0
3185+
3186+
# there should have been two get_connections per execution and
3187+
# two executions due to exception raised in the first execution
3188+
assert get_connection.call_count == 2
3189+
for cluster_node in r.nodes_manager.nodes_cache.values():
3190+
connection_pool = cluster_node.redis_connection.connection_pool
3191+
num_of_conns = len(connection_pool._available_connections)
3192+
assert num_of_conns == connection_pool._created_connections
3193+
# connection must remain connected
3194+
for conn in connection_pool._available_connections:
3195+
assert conn._sock is not None
3196+
31563197
def test_empty_stack(self, r):
31573198
"""
31583199
If pipeline is executed with no commands it should

tests/test_retry.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
BusyLoadingError,
1010
ConnectionError,
1111
ReadOnlyError,
12+
RedisClusterException,
13+
RedisError,
1214
TimeoutError,
1315
)
1416
from redis.retry import Retry
@@ -122,6 +124,16 @@ def test_infinite_retry(self):
122124
assert self.actual_attempts == 5
123125
assert self.actual_failures == 5
124126

127+
@pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError])
128+
def test_is_supported_error_true(self, exception_class):
129+
retry = Retry(BackoffMock(), -1)
130+
assert retry.is_supported_error(exception_class())
131+
132+
@pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError])
133+
def test_is_supported_error_false(self, exception_class):
134+
retry = Retry(BackoffMock(), -1)
135+
assert not retry.is_supported_error(exception_class())
136+
125137

126138
@pytest.mark.onlynoncluster
127139
class TestRedisClientRetry:

0 commit comments

Comments
 (0)