Skip to content

Commit e2bc9cb

Browse files
committed
release, instead of disconnect on any error, when fetching connections in cluster pipeline
1 parent 110bb6b commit e2bc9cb

File tree

2 files changed

+46
-8
lines changed

2 files changed

+46
-8
lines changed

redis/cluster.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,7 +2034,7 @@ def _send_cluster_commands(
20342034
redis_node = self.get_redis_connection(node)
20352035
try:
20362036
connection = get_connection(redis_node, c.args)
2037-
except (ConnectionError, TimeoutError) as e:
2037+
except BaseException as e:
20382038
for n in nodes.values():
20392039
n.connection_pool.release(n.connection)
20402040
n.connection = None
@@ -2043,9 +2043,10 @@ def _send_cluster_commands(
20432043
backoff = self.retry._backoff.compute(attempts_count)
20442044
if backoff > 0:
20452045
time.sleep(backoff)
2046-
self.nodes_manager.initialize()
2047-
if is_default_node:
2048-
self.replace_default_node()
2046+
if isinstance(e, (ConnectionError, TimeoutError)):
2047+
self.nodes_manager.initialize()
2048+
if is_default_node:
2049+
self.replace_default_node()
20492050
raise
20502051
nodes[node_name] = NodeCommands(
20512052
redis_node.parse_response,

tests/test_cluster.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,8 +2805,10 @@ def raise_error():
28052805

28062806
m.side_effect = raise_error
28072807

2808-
with pytest.raises(Exception, match="unexpected error"):
2809-
r.pipeline().get("a").execute()
2808+
with patch.object(Connection, "disconnect") as d:
2809+
with pytest.raises(Exception, match="unexpected error"):
2810+
r.pipeline().get("a").execute()
2811+
assert d.call_count == 1
28102812

28112813
for cluster_node in r.nodes_manager.nodes_cache.values():
28122814
connection_pool = cluster_node.redis_connection.connection_pool
@@ -3127,7 +3129,7 @@ def raise_ask_error():
31273129
assert res == ["MOCK_OK"]
31283130

31293131
@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
3130-
def test_return_previous_acquired_connections(self, r, error):
3132+
def test_return_previous_acquired_connections_with_retry(self, r, error):
31313133
# in order to ensure that a pipeline will make use of connections
31323134
# from different nodes
31333135
assert r.keyslot("a") != r.keyslot("b")
@@ -3143,7 +3145,9 @@ def raise_error(target_node, *args, **kwargs):
31433145

31443146
get_connection.side_effect = raise_error
31453147

3146-
r.pipeline().get("a").get("b").execute()
3148+
with patch.object(Connection, "disconnect") as d:
3149+
r.pipeline().get("a").get("b").execute()
3150+
assert d.call_count == 0
31473151

31483152
# there should have been two get_connections per execution and
31493153
# two executions due to exception raised in the first execution
@@ -3153,6 +3157,39 @@ def raise_error(target_node, *args, **kwargs):
31533157
num_of_conns = len(connection_pool._available_connections)
31543158
assert num_of_conns == connection_pool._created_connections
31553159

3160+
@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
3161+
def test_return_previous_acquired_connections_without_retry(self, r, error):
3162+
# in order to ensure that a pipeline will make use of connections
3163+
# from different nodes
3164+
assert r.keyslot("a") != r.keyslot("b")
3165+
3166+
orig_func = redis.cluster.get_connection
3167+
with patch("redis.cluster.get_connection") as get_connection:
3168+
3169+
def raise_error(target_node, *args, **kwargs):
3170+
if get_connection.call_count == 2:
3171+
raise error("mocked error")
3172+
else:
3173+
return orig_func(target_node, *args, **kwargs)
3174+
3175+
get_connection.side_effect = raise_error
3176+
3177+
with patch.object(Connection, "disconnect") as d:
3178+
with pytest.raises(error):
3179+
r.pipeline().get("a").get("b").execute()
3180+
assert d.call_count == 0
3181+
3182+
# there should have been two get_connections per execution and
3183+
# two executions due to exception raised in the first execution
3184+
assert get_connection.call_count == 2
3185+
for cluster_node in r.nodes_manager.nodes_cache.values():
3186+
connection_pool = cluster_node.redis_connection.connection_pool
3187+
num_of_conns = len(connection_pool._available_connections)
3188+
assert num_of_conns == connection_pool._created_connections
3189+
# connection must remain connected
3190+
for conn in connection_pool._available_connections:
3191+
assert conn._sock is not None
3192+
31563193
def test_empty_stack(self, r):
31573194
"""
31583195
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)