Skip to content

Commit fd9fea6

Browse files
authored
ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* adds a retry that forces the nodes_manager to reinitialize if cluster pipleline connect to the one of the mapped nodes * fix line length error * add trailing comma * move appending cmd * updates changes * trigger build * fix linting errors
1 parent da9d903 commit fd9fea6

File tree

2 files changed

+46
-25
lines changed

2 files changed

+46
-25
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
* Added dynaminc_startup_nodes configuration to RedisCluster
1717
* Fix reusing the old nodes' connections when cluster topology refresh is being done
1818
* Fix RedisCluster to immediately raise AuthenticationError without a retry
19+
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
20+
1921
* 4.1.3 (Feb 8, 2022)
2022
* Fix flushdb and flushall (#1926)
2123
* Add redis5 and redis4 dockers (#1871)

redis/cluster.py

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1897,34 +1897,53 @@ def _send_cluster_commands(
18971897
# we figure out the slot number that command maps to, then from
18981898
# the slot determine the node.
18991899
for c in attempt:
1900-
# refer to our internal node -> slot table that
1901-
# tells us where a given
1902-
# command should route to.
1903-
passed_targets = c.options.pop("target_nodes", None)
1904-
if passed_targets and not self._is_nodes_flag(passed_targets):
1905-
target_nodes = self._parse_target_nodes(passed_targets)
1906-
else:
1907-
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
1908-
if not target_nodes:
1900+
connection_error_retry_counter = 0
1901+
while True:
1902+
# refer to our internal node -> slot table that
1903+
# tells us where a given command should route to.
1904+
# (it might be possible we have a cached node that no longer
1905+
# exists in the cluster, which is why we do this in a loop)
1906+
passed_targets = c.options.pop("target_nodes", None)
1907+
if passed_targets and not self._is_nodes_flag(passed_targets):
1908+
target_nodes = self._parse_target_nodes(passed_targets)
1909+
else:
1910+
target_nodes = self._determine_nodes(
1911+
*c.args, node_flag=passed_targets
1912+
)
1913+
if not target_nodes:
1914+
raise RedisClusterException(
1915+
f"No targets were found to execute {c.args} command on"
1916+
)
1917+
if len(target_nodes) > 1:
19091918
raise RedisClusterException(
1910-
f"No targets were found to execute {c.args} command on"
1919+
f"Too many targets for command {c.args}"
19111920
)
1912-
if len(target_nodes) > 1:
1913-
raise RedisClusterException(f"Too many targets for command {c.args}")
1914-
1915-
node = target_nodes[0]
1916-
# now that we know the name of the node
1917-
# ( it's just a string in the form of host:port )
1918-
# we can build a list of commands for each node.
1919-
node_name = node.name
1920-
if node_name not in nodes:
1921-
redis_node = self.get_redis_connection(node)
1922-
connection = get_connection(redis_node, c.args)
1923-
nodes[node_name] = NodeCommands(
1924-
redis_node.parse_response, redis_node.connection_pool, connection
1925-
)
19261921

1927-
nodes[node_name].append(c)
1922+
node = target_nodes[0]
1923+
1924+
# now that we know the name of the node
1925+
# ( it's just a string in the form of host:port )
1926+
# we can build a list of commands for each node.
1927+
node_name = node.name
1928+
if node_name not in nodes:
1929+
redis_node = self.get_redis_connection(node)
1930+
try:
1931+
connection = get_connection(redis_node, c.args)
1932+
except ConnectionError:
1933+
connection_error_retry_counter += 1
1934+
if connection_error_retry_counter < 5:
1935+
# reinitialize the node -> slot table
1936+
self.nodes_manager.initialize()
1937+
continue
1938+
else:
1939+
raise
1940+
nodes[node_name] = NodeCommands(
1941+
redis_node.parse_response,
1942+
redis_node.connection_pool,
1943+
connection,
1944+
)
1945+
nodes[node_name].append(c)
1946+
break
19281947

19291948
# send the commands in sequence.
19301949
# we write to all the open sockets for each node first,

0 commit comments

Comments
 (0)