Skip to content

Commit 67214cc

Browse files
barshaulchayimdvora-h
authored
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
1 parent bb06ccd commit 67214cc

14 files changed

+413
-166
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
2525
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
2626
* The `deprecated` library is no longer a dependency
27+
* Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
28+
* Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297)
2729
* Added CredentialsProvider class to support password rotation
2830
* Enable Lock for asyncio cluster mode
2931

redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import sys
22

3+
from redis.backoff import default_backoff
34
from redis.client import Redis, StrictRedis
45
from redis.cluster import RedisCluster
56
from redis.connection import (
@@ -66,6 +67,7 @@ def int_or_str(value):
6667
"CredentialProvider",
6768
"DataError",
6869
"from_url",
70+
"default_backoff",
6971
"InvalidResponse",
7072
"PubSubError",
7173
"ReadOnlyError",

redis/asyncio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
SentinelManagedSSLConnection,
1616
)
1717
from redis.asyncio.utils import from_url
18+
from redis.backoff import default_backoff
1819
from redis.exceptions import (
1920
AuthenticationError,
2021
AuthenticationWrongNumberOfArgsError,
@@ -43,6 +44,7 @@
4344
"ConnectionPool",
4445
"DataError",
4546
"from_url",
47+
"default_backoff",
4648
"InvalidResponse",
4749
"PubSubError",
4850
"ReadOnlyError",

redis/asyncio/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ def get_connection_kwargs(self):
276276
"""Get the connection's key-word arguments"""
277277
return self.connection_pool.connection_kwargs
278278

279+
def get_retry(self) -> Optional["Retry"]:
280+
return self.get_connection_kwargs().get("retry")
281+
282+
def set_retry(self, retry: "Retry") -> None:
283+
self.get_connection_kwargs().update({"retry": retry})
284+
self.connection_pool.set_retry(retry)
285+
279286
def load_external_module(self, funcname, func):
280287
"""
281288
This function can be used to add externally defined redis modules,

redis/asyncio/cluster.py

Lines changed: 63 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
)
2727
from redis.asyncio.lock import Lock
2828
from redis.asyncio.parser import CommandsParser
29+
from redis.asyncio.retry import Retry
30+
from redis.backoff import default_backoff
2931
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
3032
from redis.cluster import (
3133
PIPELINE_BLOCKED_COMMANDS,
@@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
110112
:param startup_nodes:
111113
| :class:`~.ClusterNode` to used as a startup node
112114
:param require_full_coverage:
113-
| When set to ``False``: the client will not require a full coverage of the
114-
slots. However, if not all slots are covered, and at least one node has
115-
``cluster-require-full-coverage`` set to ``yes``, the server will throw a
116-
:class:`~.ClusterDownError` for some key-based commands.
115+
| When set to ``False``: the client will not require a full coverage of
116+
the slots. However, if not all slots are covered, and at least one node
117+
has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
118+
a :class:`~.ClusterDownError` for some key-based commands.
117119
| When set to ``True``: all slots must be covered to construct the cluster
118120
client. If not all slots are covered, :class:`~.RedisClusterException` will be
119121
thrown.
@@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
136138
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
137139
:param connection_error_retry_attempts:
138140
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
139-
or :class:`~.ConnectionError` are encountered
141+
or :class:`~.ConnectionError` are encountered.
142+
The default backoff strategy will be set if Retry object is not passed (see
143+
default_backoff in backoff.py). To change it, pass a custom Retry object
144+
using the "retry" keyword.
140145
:param max_connections:
141146
| Maximum number of connections per node. If there are no free connections & the
142147
maximum number of connections are already created, a
@@ -214,9 +219,9 @@ def __init__(
214219
startup_nodes: Optional[List["ClusterNode"]] = None,
215220
require_full_coverage: bool = True,
216221
read_from_replicas: bool = False,
217-
reinitialize_steps: int = 10,
222+
reinitialize_steps: int = 5,
218223
cluster_error_retry_attempts: int = 3,
219-
connection_error_retry_attempts: int = 5,
224+
connection_error_retry_attempts: int = 3,
220225
max_connections: int = 2**31,
221226
# Client related kwargs
222227
db: Union[str, int] = 0,
@@ -235,6 +240,8 @@ def __init__(
235240
socket_keepalive: bool = False,
236241
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
237242
socket_timeout: Optional[float] = None,
243+
retry: Optional["Retry"] = None,
244+
retry_on_error: Optional[List[Exception]] = None,
238245
# SSL related kwargs
239246
ssl: bool = False,
240247
ssl_ca_certs: Optional[str] = None,
@@ -282,6 +289,7 @@ def __init__(
282289
"socket_keepalive": socket_keepalive,
283290
"socket_keepalive_options": socket_keepalive_options,
284291
"socket_timeout": socket_timeout,
292+
"retry": retry,
285293
}
286294

287295
if ssl:
@@ -302,6 +310,18 @@ def __init__(
302310
# Call our on_connect function to configure READONLY mode
303311
kwargs["redis_connect_func"] = self.on_connect
304312

313+
self.retry = retry
314+
if retry or retry_on_error or connection_error_retry_attempts > 0:
315+
# Set a retry object for all cluster nodes
316+
self.retry = retry or Retry(
317+
default_backoff(), connection_error_retry_attempts
318+
)
319+
if not retry_on_error:
320+
# Default errors for retrying
321+
retry_on_error = [ConnectionError, TimeoutError]
322+
self.retry.update_supported_errors(retry_on_error)
323+
kwargs.update({"retry": self.retry})
324+
305325
kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
306326
self.connection_kwargs = kwargs
307327

@@ -323,7 +343,6 @@ def __init__(
323343
self.reinitialize_steps = reinitialize_steps
324344
self.cluster_error_retry_attempts = cluster_error_retry_attempts
325345
self.connection_error_retry_attempts = connection_error_retry_attempts
326-
327346
self.reinitialize_counter = 0
328347
self.commands_parser = CommandsParser()
329348
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -481,6 +500,16 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
481500
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
482501
return self.connection_kwargs
483502

503+
def get_retry(self) -> Optional["Retry"]:
504+
return self.retry
505+
506+
def set_retry(self, retry: "Retry") -> None:
507+
self.retry = retry
508+
for node in self.get_nodes():
509+
node.connection_kwargs.update({"retry": retry})
510+
for conn in node._connections:
511+
conn.retry = retry
512+
484513
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
485514
"""Set a custom response callback."""
486515
self.response_callbacks[command] = callback
@@ -618,9 +647,11 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
618647
if passed_targets and not self._is_node_flag(passed_targets):
619648
target_nodes = self._parse_target_nodes(passed_targets)
620649
target_nodes_specified = True
621-
retry_attempts = 1
650+
retry_attempts = 0
622651

623-
for _ in range(retry_attempts):
652+
# Add one for the first execution
653+
execute_attempts = 1 + retry_attempts
654+
for _ in range(execute_attempts):
624655
if self._initialize:
625656
await self.initialize()
626657
try:
@@ -658,25 +689,21 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
658689
)
659690
return dict(zip(keys, values))
660691
except Exception as e:
661-
if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
662-
# The nodes and slots cache were reinitialized.
692+
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
693+
# The nodes and slots cache were should be reinitialized.
663694
# Try again with the new cluster setup.
664-
exception = e
695+
retry_attempts -= 1
696+
continue
665697
else:
666-
# All other errors should be raised.
667-
raise
668-
669-
# If it fails the configured number of times then raise exception back
670-
# to caller of this method
671-
raise exception
698+
# raise the exception
699+
raise e
672700

673701
async def _execute_command(
674702
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
675703
) -> Any:
676704
asking = moved = False
677705
redirect_addr = None
678706
ttl = self.RedisClusterRequestTTL
679-
connection_error_retry_counter = 0
680707

681708
while ttl > 0:
682709
ttl -= 1
@@ -695,25 +722,18 @@ async def _execute_command(
695722
moved = False
696723

697724
return await target_node.execute_command(*args, **kwargs)
698-
except BusyLoadingError:
725+
except (BusyLoadingError, MaxConnectionsError):
726+
raise
727+
except (ConnectionError, TimeoutError):
728+
# Connection retries are being handled in the node's
729+
# Retry object.
730+
# Remove the failed node from the startup nodes before we try
731+
# to reinitialize the cluster
732+
self.nodes_manager.startup_nodes.pop(target_node.name, None)
733+
# Hard force of reinitialize of the node/slots setup
734+
# and try again with the new setup
735+
await self.close()
699736
raise
700-
except (ConnectionError, TimeoutError) as e:
701-
# Give the node 0.25 seconds to get back up and retry again with the
702-
# same node and configuration. After the defined number of attempts, try
703-
# to reinitialize the cluster and try again.
704-
connection_error_retry_counter += 1
705-
if (
706-
connection_error_retry_counter
707-
< self.connection_error_retry_attempts
708-
):
709-
await asyncio.sleep(0.25)
710-
else:
711-
if isinstance(e, MaxConnectionsError):
712-
raise
713-
# Hard force of reinitialize of the node/slots setup
714-
# and try again with the new setup
715-
await self.close()
716-
raise
717737
except ClusterDownError:
718738
# ClusterDownError can occur during a failover and to get
719739
# self-healed, we will try to reinitialize the cluster layout
@@ -1145,26 +1165,11 @@ async def initialize(self) -> None:
11451165
)
11461166
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
11471167
startup_nodes_reachable = True
1148-
except (ConnectionError, TimeoutError) as e:
1168+
except Exception as e:
1169+
# Try the next startup node.
1170+
# The exception is saved and raised only if we have no more nodes.
11491171
exception = e
11501172
continue
1151-
except ResponseError as e:
1152-
# Isn't a cluster connection, so it won't parse these
1153-
# exceptions automatically
1154-
message = e.__str__()
1155-
if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
1156-
continue
1157-
else:
1158-
raise RedisClusterException(
1159-
'ERROR sending "cluster slots" command to redis '
1160-
f"server: {startup_node}. error: {message}"
1161-
)
1162-
except Exception as e:
1163-
message = e.__str__()
1164-
raise RedisClusterException(
1165-
'ERROR sending "cluster slots" command to redis '
1166-
f"server {startup_node.name}. error: {message}"
1167-
)
11681173

11691174
# CLUSTER SLOTS command results in the following output:
11701175
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
@@ -1245,8 +1250,8 @@ async def initialize(self) -> None:
12451250

12461251
if not startup_nodes_reachable:
12471252
raise RedisClusterException(
1248-
"Redis Cluster cannot be connected. Please provide at least "
1249-
"one reachable node. "
1253+
f"Redis Cluster cannot be connected. Please provide at least "
1254+
f"one reachable node: {str(exception)}"
12501255
) from exception
12511256

12521257
# Check if the slots are not fully covered

redis/asyncio/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ def __init__(
497497
retry_on_error.append(socket.timeout)
498498
retry_on_error.append(asyncio.TimeoutError)
499499
self.retry_on_error = retry_on_error
500-
if retry_on_error:
500+
if retry or retry_on_error:
501501
if not retry:
502502
self.retry = Retry(NoBackoff(), 1)
503503
else:
@@ -1445,6 +1445,12 @@ async def disconnect(self, inuse_connections: bool = True):
14451445
if exc:
14461446
raise exc
14471447

1448+
def set_retry(self, retry: "Retry") -> None:
1449+
for conn in self._available_connections:
1450+
conn.retry = retry
1451+
for conn in self._in_use_connections:
1452+
conn.retry = retry
1453+
14481454

14491455
class BlockingConnectionPool(ConnectionPool):
14501456
"""

redis/backoff.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import random
22
from abc import ABC, abstractmethod
33

4+
# Maximum backoff between each retry in seconds
5+
DEFAULT_CAP = 0.512
6+
# Minimum backoff between each retry in seconds
7+
DEFAULT_BASE = 0.008
8+
49

510
class AbstractBackoff(ABC):
611
"""Backoff interface"""
@@ -40,7 +45,7 @@ def __init__(self):
4045
class ExponentialBackoff(AbstractBackoff):
4146
"""Exponential backoff upon failure"""
4247

43-
def __init__(self, cap, base):
48+
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
4449
"""
4550
`cap`: maximum backoff time in seconds
4651
`base`: base backoff time in seconds
@@ -55,7 +60,7 @@ def compute(self, failures):
5560
class FullJitterBackoff(AbstractBackoff):
5661
"""Full jitter backoff upon failure"""
5762

58-
def __init__(self, cap, base):
63+
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
5964
"""
6065
`cap`: maximum backoff time in seconds
6166
`base`: base backoff time in seconds
@@ -70,7 +75,7 @@ def compute(self, failures):
7075
class EqualJitterBackoff(AbstractBackoff):
7176
"""Equal jitter backoff upon failure"""
7277

73-
def __init__(self, cap, base):
78+
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
7479
"""
7580
`cap`: maximum backoff time in seconds
7681
`base`: base backoff time in seconds
@@ -86,7 +91,7 @@ def compute(self, failures):
8691
class DecorrelatedJitterBackoff(AbstractBackoff):
8792
"""Decorrelated jitter backoff upon failure"""
8893

89-
def __init__(self, cap, base):
94+
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
9095
"""
9196
`cap`: maximum backoff time in seconds
9297
`base`: base backoff time in seconds
@@ -103,3 +108,7 @@ def compute(self, failures):
103108
temp = random.uniform(self._base, max_backoff)
104109
self._previous_backoff = min(self._cap, temp)
105110
return self._previous_backoff
111+
112+
113+
def default_backoff():
114+
return EqualJitterBackoff()

redis/client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
WatchError,
2727
)
2828
from redis.lock import Lock
29+
from redis.retry import Retry
2930
from redis.utils import safe_str, str_if_bytes
3031

3132
SYM_EMPTY = b""
@@ -1047,6 +1048,13 @@ def get_connection_kwargs(self):
10471048
"""Get the connection's key-word arguments"""
10481049
return self.connection_pool.connection_kwargs
10491050

1051+
def get_retry(self) -> Optional["Retry"]:
1052+
return self.get_connection_kwargs().get("retry")
1053+
1054+
def set_retry(self, retry: "Retry") -> None:
1055+
self.get_connection_kwargs().update({"retry": retry})
1056+
self.connection_pool.set_retry(retry)
1057+
10501058
def set_response_callback(self, command, callback):
10511059
"""Set a custom Response Callback"""
10521060
self.response_callbacks[command] = callback

0 commit comments

Comments
 (0)