Skip to content

Commit 1218f5b

Browse files
committed
[GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline (#8)
* [GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline * [GROW-3247] fix style issue * unassign n.connection at every loop
1 parent 56fcce3 commit 1218f5b

File tree

2 files changed

+185
-146
lines changed

2 files changed

+185
-146
lines changed

redis/cluster.py

Lines changed: 162 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,158 +1996,174 @@ def _send_cluster_commands(
19961996
# build a list of node objects based on node names we need to
19971997
nodes = {}
19981998

1999-
# as we move through each command that still needs to be processed,
2000-
# we figure out the slot number that command maps to, then from
2001-
# the slot determine the node.
2002-
for c in attempt:
2003-
while True:
2004-
# refer to our internal node -> slot table that
2005-
# tells us where a given command should route to.
2006-
# (it might be possible we have a cached node that no longer
2007-
# exists in the cluster, which is why we do this in a loop)
2008-
passed_targets = c.options.pop("target_nodes", None)
2009-
if passed_targets and not self._is_nodes_flag(passed_targets):
2010-
target_nodes = self._parse_target_nodes(passed_targets)
2011-
else:
2012-
target_nodes = self._determine_nodes(
2013-
*c.args, node_flag=passed_targets
2014-
)
2015-
if not target_nodes:
1999+
try:
2000+
# as we move through each command that still needs to be processed,
2001+
# we figure out the slot number that command maps to, then from
2002+
# the slot determine the node.
2003+
for c in attempt:
2004+
while True:
2005+
# refer to our internal node -> slot table that
2006+
# tells us where a given command should route to.
2007+
# (it might be possible we have a cached node that no longer
2008+
# exists in the cluster, which is why we do this in a loop)
2009+
passed_targets = c.options.pop("target_nodes", None)
2010+
if passed_targets and not self._is_nodes_flag(passed_targets):
2011+
target_nodes = self._parse_target_nodes(passed_targets)
2012+
else:
2013+
target_nodes = self._determine_nodes(
2014+
*c.args, node_flag=passed_targets
2015+
)
2016+
if not target_nodes:
2017+
raise RedisClusterException(
2018+
f"No targets were found to execute {c.args} command on"
2019+
)
2020+
if len(target_nodes) > 1:
20162021
raise RedisClusterException(
2017-
f"No targets were found to execute {c.args} command on"
2022+
f"Too many targets for command {c.args}"
20182023
)
2019-
if len(target_nodes) > 1:
2020-
raise RedisClusterException(
2021-
f"Too many targets for command {c.args}"
2022-
)
20232024

2024-
node = target_nodes[0]
2025-
if node == self.get_default_node():
2026-
is_default_node = True
2025+
node = target_nodes[0]
2026+
if node == self.get_default_node():
2027+
is_default_node = True
20272028

2028-
# now that we know the name of the node
2029-
# ( it's just a string in the form of host:port )
2030-
# we can build a list of commands for each node.
2031-
node_name = node.name
2032-
if node_name not in nodes:
2033-
redis_node = self.get_redis_connection(node)
2029+
# now that we know the name of the node
2030+
# ( it's just a string in the form of host:port )
2031+
# we can build a list of commands for each node.
2032+
node_name = node.name
2033+
if node_name not in nodes:
2034+
redis_node = self.get_redis_connection(node)
2035+
try:
2036+
connection = get_connection(redis_node, c.args)
2037+
except (ConnectionError, TimeoutError) as e:
2038+
for n in nodes.values():
2039+
n.connection_pool.release(n.connection)
2040+
n.connection = None
2041+
nodes = {}
2042+
if self.retry and isinstance(
2043+
e, self.retry._supported_errors
2044+
):
2045+
backoff = self.retry._backoff.compute(attempts_count)
2046+
if backoff > 0:
2047+
time.sleep(backoff)
2048+
self.nodes_manager.initialize()
2049+
if is_default_node:
2050+
self.replace_default_node()
2051+
raise
2052+
nodes[node_name] = NodeCommands(
2053+
redis_node.parse_response,
2054+
redis_node.connection_pool,
2055+
connection,
2056+
)
2057+
nodes[node_name].append(c)
2058+
break
2059+
2060+
# send the commands in sequence.
2061+
# we write to all the open sockets for each node first,
2062+
# before reading anything
2063+
# this allows us to flush all the requests out across the
2064+
# network essentially in parallel
2065+
# so that we can read them all in parallel as they come back.
2066+
# we dont' multiplex on the sockets as they come available,
2067+
# but that shouldn't make too much difference.
2068+
node_commands = nodes.values()
2069+
for n in node_commands:
2070+
n.write()
2071+
2072+
for n in node_commands:
2073+
n.read()
2074+
2075+
# release all of the redis connections we allocated earlier
2076+
# back into the connection pool.
2077+
# we used to do this step as part of a try/finally block,
2078+
# but it is really dangerous to
2079+
# release connections back into the pool if for some
2080+
# reason the socket has data still left in it
2081+
# from a previous operation. The write and
2082+
# read operations already have try/catch around them for
2083+
# all known types of errors including connection
2084+
# and socket level errors.
2085+
# So if we hit an exception, something really bad
2086+
# happened and putting any oF
2087+
# these connections back into the pool is a very bad idea.
2088+
# the socket might have unread buffer still sitting in it,
2089+
# and then the next time we read from it we pass the
2090+
# buffered result back from a previous command and
2091+
# every single request after to that connection will always get
2092+
# a mismatched result.
2093+
for n in nodes.values():
2094+
n.connection_pool.release(n.connection)
2095+
n.connection = None
2096+
nodes = {}
2097+
2098+
# if the response isn't an exception it is a
2099+
# valid response from the node
2100+
# we're all done with that command, YAY!
2101+
# if we have more commands to attempt, we've run into problems.
2102+
# collect all the commands we are allowed to retry.
2103+
# (MOVED, ASK, or connection errors or timeout errors)
2104+
attempt = sorted(
2105+
(
2106+
c
2107+
for c in attempt
2108+
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2109+
),
2110+
key=lambda x: x.position,
2111+
)
2112+
if attempt and allow_redirections:
2113+
# RETRY MAGIC HAPPENS HERE!
2114+
# send these remaing commands one at a time using `execute_command`
2115+
# in the main client. This keeps our retry logic
2116+
# in one place mostly,
2117+
# and allows us to be more confident in correctness of behavior.
2118+
# at this point any speed gains from pipelining have been lost
2119+
# anyway, so we might as well make the best
2120+
# attempt to get the correct behavior.
2121+
#
2122+
# The client command will handle retries for each
2123+
# individual command sequentially as we pass each
2124+
# one into `execute_command`. Any exceptions
2125+
# that bubble out should only appear once all
2126+
# retries have been exhausted.
2127+
#
2128+
# If a lot of commands have failed, we'll be setting the
2129+
# flag to rebuild the slots table from scratch.
2130+
# So MOVED errors should correct themselves fairly quickly.
2131+
self.reinitialize_counter += 1
2132+
if self._should_reinitialized():
2133+
self.nodes_manager.initialize()
2134+
if is_default_node:
2135+
self.replace_default_node()
2136+
for c in attempt:
20342137
try:
2035-
connection = get_connection(redis_node, c.args)
2036-
except (ConnectionError, TimeoutError) as e:
2037-
for n in nodes.values():
2038-
n.connection_pool.release(n.connection)
2039-
if self.retry and isinstance(e, self.retry._supported_errors):
2040-
backoff = self.retry._backoff.compute(attempts_count)
2041-
if backoff > 0:
2042-
time.sleep(backoff)
2043-
self.nodes_manager.initialize()
2044-
if is_default_node:
2045-
self.replace_default_node()
2046-
raise
2047-
nodes[node_name] = NodeCommands(
2048-
redis_node.parse_response,
2049-
redis_node.connection_pool,
2050-
connection,
2051-
)
2052-
nodes[node_name].append(c)
2053-
break
2054-
2055-
# send the commands in sequence.
2056-
# we write to all the open sockets for each node first,
2057-
# before reading anything
2058-
# this allows us to flush all the requests out across the
2059-
# network essentially in parallel
2060-
# so that we can read them all in parallel as they come back.
2061-
# we dont' multiplex on the sockets as they come available,
2062-
# but that shouldn't make too much difference.
2063-
node_commands = nodes.values()
2064-
for n in node_commands:
2065-
n.write()
2066-
2067-
for n in node_commands:
2068-
n.read()
2069-
2070-
# release all of the redis connections we allocated earlier
2071-
# back into the connection pool.
2072-
# we used to do this step as part of a try/finally block,
2073-
# but it is really dangerous to
2074-
# release connections back into the pool if for some
2075-
# reason the socket has data still left in it
2076-
# from a previous operation. The write and
2077-
# read operations already have try/catch around them for
2078-
# all known types of errors including connection
2079-
# and socket level errors.
2080-
# So if we hit an exception, something really bad
2081-
# happened and putting any oF
2082-
# these connections back into the pool is a very bad idea.
2083-
# the socket might have unread buffer still sitting in it,
2084-
# and then the next time we read from it we pass the
2085-
# buffered result back from a previous command and
2086-
# every single request after to that connection will always get
2087-
# a mismatched result.
2088-
for n in nodes.values():
2089-
n.connection_pool.release(n.connection)
2090-
2091-
# if the response isn't an exception it is a
2092-
# valid response from the node
2093-
# we're all done with that command, YAY!
2094-
# if we have more commands to attempt, we've run into problems.
2095-
# collect all the commands we are allowed to retry.
2096-
# (MOVED, ASK, or connection errors or timeout errors)
2097-
attempt = sorted(
2098-
(
2099-
c
2100-
for c in attempt
2101-
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2102-
),
2103-
key=lambda x: x.position,
2104-
)
2105-
if attempt and allow_redirections:
2106-
# RETRY MAGIC HAPPENS HERE!
2107-
# send these remaing commands one at a time using `execute_command`
2108-
# in the main client. This keeps our retry logic
2109-
# in one place mostly,
2110-
# and allows us to be more confident in correctness of behavior.
2111-
# at this point any speed gains from pipelining have been lost
2112-
# anyway, so we might as well make the best
2113-
# attempt to get the correct behavior.
2114-
#
2115-
# The client command will handle retries for each
2116-
# individual command sequentially as we pass each
2117-
# one into `execute_command`. Any exceptions
2118-
# that bubble out should only appear once all
2119-
# retries have been exhausted.
2120-
#
2121-
# If a lot of commands have failed, we'll be setting the
2122-
# flag to rebuild the slots table from scratch.
2123-
# So MOVED errors should correct themselves fairly quickly.
2124-
self.reinitialize_counter += 1
2125-
if self._should_reinitialized():
2126-
self.nodes_manager.initialize()
2127-
if is_default_node:
2128-
self.replace_default_node()
2129-
for c in attempt:
2130-
try:
2131-
# send each command individually like we
2132-
# do in the main client.
2133-
c.result = super().execute_command(*c.args, **c.options)
2134-
except RedisError as e:
2135-
c.result = e
2136-
2137-
# turn the response back into a simple flat array that corresponds
2138-
# to the sequence of commands issued in the stack in pipeline.execute()
2139-
response = []
2140-
for c in sorted(stack, key=lambda x: x.position):
2141-
if c.args[0] in self.cluster_response_callbacks:
2142-
c.result = self.cluster_response_callbacks[c.args[0]](
2143-
c.result, **c.options
2144-
)
2145-
response.append(c.result)
2146-
2147-
if raise_on_error:
2148-
self.raise_first_error(stack)
2138+
# send each command individually like we
2139+
# do in the main client.
2140+
c.result = super().execute_command(*c.args, **c.options)
2141+
except RedisError as e:
2142+
c.result = e
21492143

2150-
return response
2144+
# turn the response back into a simple flat array that corresponds
2145+
# to the sequence of commands issued in the stack in pipeline.execute()
2146+
response = []
2147+
for c in sorted(stack, key=lambda x: x.position):
2148+
if c.args[0] in self.cluster_response_callbacks:
2149+
c.result = self.cluster_response_callbacks[c.args[0]](
2150+
c.result, **c.options
2151+
)
2152+
response.append(c.result)
2153+
2154+
if raise_on_error:
2155+
self.raise_first_error(stack)
2156+
2157+
return response
2158+
except BaseException:
2159+
# if nodes is not empty, a problem must have occurred
2160+
# since we cant guarantee the state of the connections,
2161+
# disconnect before returning it to the connection pool
2162+
for n in nodes.values():
2163+
if n.connection:
2164+
n.connection.disconnect()
2165+
n.connection_pool.release(n.connection)
2166+
raise
21512167

21522168
def _fail_on_redirect(self, allow_redirections):
21532169
""" """

tests/test_cluster.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
REPLICA,
2929
ClusterNode,
3030
LoadBalancer,
31+
NodeCommands,
3132
NodesManager,
3233
RedisCluster,
3334
get_node_name,
@@ -2790,6 +2791,28 @@ class TestClusterPipeline:
27902791
Tests for the ClusterPipeline class
27912792
"""
27922793

2794+
@pytest.mark.parametrize("function", ["write", "read"])
2795+
def test_connection_release_with_unexpected_error_in_node_commands(
2796+
self, r, function
2797+
):
2798+
"""
2799+
Test that connection is released to the pool, even with an unexpected error
2800+
"""
2801+
with patch.object(NodeCommands, function) as m:
2802+
2803+
def raise_error():
2804+
raise Exception("unexpected error")
2805+
2806+
m.side_effect = raise_error
2807+
2808+
with pytest.raises(Exception, match="unexpected error"):
2809+
r.pipeline().get("a").execute()
2810+
2811+
for cluster_node in r.nodes_manager.nodes_cache.values():
2812+
connection_pool = cluster_node.redis_connection.connection_pool
2813+
num_of_conns = len(connection_pool._available_connections)
2814+
assert num_of_conns == connection_pool._created_connections
2815+
27932816
def test_blocked_methods(self, r):
27942817
"""
27952818
Currently some method calls on a Cluster pipeline

0 commit comments

Comments
 (0)