Skip to content

Commit 021fa98

Browse files
committed
release, instead of disconnect on any error, when fetching connections in cluster pipeline
1 parent 8d17920 commit 021fa98

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

redis/cluster.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,7 +2034,7 @@ def _send_cluster_commands(
20342034
redis_node = self.get_redis_connection(node)
20352035
try:
20362036
connection = get_connection(redis_node, c.args)
2037-
except (ConnectionError, TimeoutError) as e:
2037+
except BaseException as e:
20382038
for n in nodes.values():
20392039
n.connection_pool.release(n.connection)
20402040
n.connection = None
@@ -2043,9 +2043,10 @@ def _send_cluster_commands(
20432043
backoff = self.retry._backoff.compute(attempts_count)
20442044
if backoff > 0:
20452045
time.sleep(backoff)
2046-
self.nodes_manager.initialize()
2047-
if is_default_node:
2048-
self.replace_default_node()
2046+
if isinstance(e, (ConnectionError, TimeoutError)):
2047+
self.nodes_manager.initialize()
2048+
if is_default_node:
2049+
self.replace_default_node()
20492050
raise
20502051
nodes[node_name] = NodeCommands(
20512052
redis_node.parse_response,

tests/test_cluster.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,8 +2805,10 @@ def raise_error():
28052805

28062806
m.side_effect = raise_error
28072807

2808-
with pytest.raises(Exception, match="unexpected error"):
2809-
r.pipeline().get("a").execute()
2808+
with patch.object(Connection, "disconnect") as d:
2809+
with pytest.raises(Exception, match="unexpected error"):
2810+
r.pipeline().get("a").execute()
2811+
assert d.call_count == 1
28102812

28112813
for cluster_node in r.nodes_manager.nodes_cache.values():
28122814
connection_pool = cluster_node.redis_connection.connection_pool
@@ -3127,7 +3129,7 @@ def raise_ask_error():
31273129
assert res == ["MOCK_OK"]
31283130

31293131
@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
3130-
def test_return_previous_acquired_connections(self, r, error):
3132+
def test_return_previous_acquired_connections_with_retry(self, r, error):
31313133
# in order to ensure that a pipeline will make use of connections
31323134
# from different nodes
31333135
assert r.keyslot("a") != r.keyslot("b")
@@ -3143,7 +3145,12 @@ def raise_error(target_node, *args, **kwargs):
31433145

31443146
get_connection.side_effect = raise_error
31453147

3146-
r.pipeline().get("a").get("b").execute()
3148+
with patch.object(NodesManager, "initialize") as i:
3149+
i.side_effect = lambda: None # in order to remove disconnect caused by initialize
3150+
3151+
with patch.object(Connection, "disconnect") as d:
3152+
r.pipeline().get("a").get("b").execute()
3153+
assert d.call_count == 0
31473154

31483155
# there should have been two get_connections per execution and
31493156
# two executions due to exception raised in the first execution
@@ -3153,6 +3160,39 @@ def raise_error(target_node, *args, **kwargs):
31533160
num_of_conns = len(connection_pool._available_connections)
31543161
assert num_of_conns == connection_pool._created_connections
31553162

3163+
@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
3164+
def test_return_previous_acquired_connections_without_retry(self, r, error):
3165+
# in order to ensure that a pipeline will make use of connections
3166+
# from different nodes
3167+
assert r.keyslot("a") != r.keyslot("b")
3168+
3169+
orig_func = redis.cluster.get_connection
3170+
with patch("redis.cluster.get_connection") as get_connection:
3171+
3172+
def raise_error(target_node, *args, **kwargs):
3173+
if get_connection.call_count == 2:
3174+
raise error("mocked error")
3175+
else:
3176+
return orig_func(target_node, *args, **kwargs)
3177+
3178+
get_connection.side_effect = raise_error
3179+
3180+
with patch.object(Connection, "disconnect") as d:
3181+
with pytest.raises(error):
3182+
r.pipeline().get("a").get("b").execute()
3183+
assert d.call_count == 0
3184+
3185+
# there should have been two get_connections per execution and
3186+
# two executions due to exception raised in the first execution
3187+
assert get_connection.call_count == 2
3188+
for cluster_node in r.nodes_manager.nodes_cache.values():
3189+
connection_pool = cluster_node.redis_connection.connection_pool
3190+
num_of_conns = len(connection_pool._available_connections)
3191+
assert num_of_conns == connection_pool._created_connections
3192+
# connection must remain connected
3193+
for conn in connection_pool._available_connections:
3194+
assert conn._sock is not None
3195+
31563196
def test_empty_stack(self, r):
31573197
"""
31583198
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)