Skip to content

Commit e19a76c

Browse files
Add retry mechanism with backoff (#1494)
1 parent b96af52 commit e19a76c

File tree

6 files changed

+360
-86
lines changed

6 files changed

+360
-86
lines changed

redis/backoff.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from abc import ABC, abstractmethod
2+
import random
3+
4+
5+
class AbstractBackoff(ABC):
6+
"""Backoff interface"""
7+
8+
def reset(self):
9+
"""
10+
Reset internal state before an operation.
11+
`reset` is called once at the beginning of
12+
every call to `Retry.call_with_retry`
13+
"""
14+
pass
15+
16+
@abstractmethod
17+
def compute(self, failures):
18+
"""Compute backoff in seconds upon failure"""
19+
pass
20+
21+
22+
class ConstantBackoff(AbstractBackoff):
23+
"""Constant backoff upon failure"""
24+
25+
def __init__(self, backoff):
26+
"""`backoff`: backoff time in seconds"""
27+
self._backoff = backoff
28+
29+
def compute(self, failures):
30+
return self._backoff
31+
32+
33+
class NoBackoff(ConstantBackoff):
34+
"""No backoff upon failure"""
35+
36+
def __init__(self):
37+
super().__init__(0)
38+
39+
40+
class ExponentialBackoff(AbstractBackoff):
41+
"""Exponential backoff upon failure"""
42+
43+
def __init__(self, cap, base):
44+
"""
45+
`cap`: maximum backoff time in seconds
46+
`base`: base backoff time in seconds
47+
"""
48+
self._cap = cap
49+
self._base = base
50+
51+
def compute(self, failures):
52+
return min(self._cap, self._base * 2 ** failures)
53+
54+
55+
class FullJitterBackoff(AbstractBackoff):
56+
"""Full jitter backoff upon failure"""
57+
58+
def __init__(self, cap, base):
59+
"""
60+
`cap`: maximum backoff time in seconds
61+
`base`: base backoff time in seconds
62+
"""
63+
self._cap = cap
64+
self._base = base
65+
66+
def compute(self, failures):
67+
return random.uniform(0, min(self._cap, self._base * 2 ** failures))
68+
69+
70+
class EqualJitterBackoff(AbstractBackoff):
71+
"""Equal jitter backoff upon failure"""
72+
73+
def __init__(self, cap, base):
74+
"""
75+
`cap`: maximum backoff time in seconds
76+
`base`: base backoff time in seconds
77+
"""
78+
self._cap = cap
79+
self._base = base
80+
81+
def compute(self, failures):
82+
temp = min(self._cap, self._base * 2 ** failures) / 2
83+
return temp + random.uniform(0, temp)
84+
85+
86+
class DecorrelatedJitterBackoff(AbstractBackoff):
87+
"""Decorrelated jitter backoff upon failure"""
88+
89+
def __init__(self, cap, base):
90+
"""
91+
`cap`: maximum backoff time in seconds
92+
`base`: base backoff time in seconds
93+
"""
94+
self._cap = cap
95+
self._base = base
96+
self._previous_backoff = 0
97+
98+
def reset(self):
99+
self._previous_backoff = 0
100+
101+
def compute(self, failures):
102+
max_backoff = max(self._base, self._previous_backoff * 3)
103+
temp = random.uniform(self._base, max_backoff)
104+
self._previous_backoff = min(self._cap, temp)
105+
return self._previous_backoff

redis/client.py

Lines changed: 110 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from itertools import chain
2+
import copy
23
import datetime
34
import hashlib
45
import re
@@ -758,7 +759,13 @@ def __init__(self, host='localhost', port=6379,
758759
ssl_cert_reqs='required', ssl_ca_certs=None,
759760
ssl_check_hostname=False,
760761
max_connections=None, single_connection_client=False,
761-
health_check_interval=0, client_name=None, username=None):
762+
health_check_interval=0, client_name=None, username=None,
763+
retry=None):
764+
"""
765+
Initialize a new Redis client.
766+
To specify a retry policy, first set `retry_on_timeout` to `True`
767+
then set `retry` to a valid `Retry` object
768+
"""
762769
if not connection_pool:
763770
if charset is not None:
764771
warnings.warn(DeprecationWarning(
@@ -778,6 +785,7 @@ def __init__(self, host='localhost', port=6379,
778785
'encoding_errors': encoding_errors,
779786
'decode_responses': decode_responses,
780787
'retry_on_timeout': retry_on_timeout,
788+
'retry': copy.deepcopy(retry),
781789
'max_connections': max_connections,
782790
'health_check_interval': health_check_interval,
783791
'client_name': client_name
@@ -940,21 +948,41 @@ def close(self):
940948
self.connection = None
941949
self.connection_pool.release(conn)
942950

951+
def _send_command_parse_response(self,
952+
conn,
953+
command_name,
954+
*args,
955+
**options):
956+
"""
957+
Send a command and parse the response
958+
"""
959+
conn.send_command(*args)
960+
return self.parse_response(conn, command_name, **options)
961+
962+
def _disconnect_raise(self, conn, error):
963+
"""
964+
Close the connection and raise an exception
965+
if retry_on_timeout is not set or the error
966+
is not a TimeoutError
967+
"""
968+
conn.disconnect()
969+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
970+
raise error
971+
943972
# COMMAND EXECUTION AND PROTOCOL PARSING
944973
def execute_command(self, *args, **options):
945974
"Execute a command and return a parsed response"
946975
pool = self.connection_pool
947976
command_name = args[0]
948977
conn = self.connection or pool.get_connection(command_name, **options)
978+
949979
try:
950-
conn.send_command(*args)
951-
return self.parse_response(conn, command_name, **options)
952-
except (ConnectionError, TimeoutError) as e:
953-
conn.disconnect()
954-
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
955-
raise
956-
conn.send_command(*args)
957-
return self.parse_response(conn, command_name, **options)
980+
return conn.retry.call_with_retry(
981+
lambda: self._send_command_parse_response(conn,
982+
command_name,
983+
*args,
984+
**options),
985+
lambda error: self._disconnect_raise(conn, error))
958986
finally:
959987
if not self.connection:
960988
pool.release(conn)
@@ -1142,24 +1170,31 @@ def execute_command(self, *args):
11421170
kwargs = {'check_health': not self.subscribed}
11431171
self._execute(connection, connection.send_command, *args, **kwargs)
11441172

1145-
def _execute(self, connection, command, *args, **kwargs):
1146-
try:
1147-
return command(*args, **kwargs)
1148-
except (ConnectionError, TimeoutError) as e:
1149-
connection.disconnect()
1150-
if not (connection.retry_on_timeout and
1151-
isinstance(e, TimeoutError)):
1152-
raise
1153-
# Connect manually here. If the Redis server is down, this will
1154-
# fail and raise a ConnectionError as desired.
1155-
connection.connect()
1156-
# the ``on_connect`` callback should haven been called by the
1157-
# connection to resubscribe us to any channels and patterns we were
1158-
# previously listening to
1159-
return command(*args, **kwargs)
1173+
def _disconnect_raise_connect(self, conn, error):
1174+
"""
1175+
Close the connection and raise an exception
1176+
if retry_on_timeout is not set or the error
1177+
is not a TimeoutError. Otherwise, try to reconnect
1178+
"""
1179+
conn.disconnect()
1180+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1181+
raise error
1182+
conn.connect()
1183+
1184+
def _execute(self, conn, command, *args, **kwargs):
1185+
"""
1186+
Connect manually upon disconnection. If the Redis server is down,
1187+
this will fail and raise a ConnectionError as desired.
1188+
After reconnection, the ``on_connect`` callback should have been
1189+
called by the # connection to resubscribe us to any channels and
1190+
patterns we were previously listening to
1191+
"""
1192+
return conn.retry.call_with_retry(
1193+
lambda: command(*args, **kwargs),
1194+
lambda error: self._disconnect_raise_connect(conn, error))
11601195

11611196
def parse_response(self, block=True, timeout=0):
1162-
"Parse the response from a publish/subscribe command"
1197+
"""Parse the response from a publish/subscribe command"""
11631198
conn = self.connection
11641199
if conn is None:
11651200
raise RuntimeError(
@@ -1499,6 +1534,27 @@ def execute_command(self, *args, **kwargs):
14991534
return self.immediate_execute_command(*args, **kwargs)
15001535
return self.pipeline_execute_command(*args, **kwargs)
15011536

1537+
def _disconnect_reset_raise(self, conn, error):
1538+
"""
1539+
Close the connection, reset watching state and
1540+
raise an exception if we were watching,
1541+
retry_on_timeout is not set,
1542+
or the error is not a TimeoutError
1543+
"""
1544+
conn.disconnect()
1545+
# if we were already watching a variable, the watch is no longer
1546+
# valid since this connection has died. raise a WatchError, which
1547+
# indicates the user should retry this transaction.
1548+
if self.watching:
1549+
self.reset()
1550+
raise WatchError("A ConnectionError occurred on while "
1551+
"watching one or more keys")
1552+
# if retry_on_timeout is not set, or the error is not
1553+
# a TimeoutError, raise it
1554+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1555+
self.reset()
1556+
raise
1557+
15021558
def immediate_execute_command(self, *args, **options):
15031559
"""
15041560
Execute a command immediately, but don't auto-retry on a
@@ -1513,33 +1569,13 @@ def immediate_execute_command(self, *args, **options):
15131569
conn = self.connection_pool.get_connection(command_name,
15141570
self.shard_hint)
15151571
self.connection = conn
1516-
try:
1517-
conn.send_command(*args)
1518-
return self.parse_response(conn, command_name, **options)
1519-
except (ConnectionError, TimeoutError) as e:
1520-
conn.disconnect()
1521-
# if we were already watching a variable, the watch is no longer
1522-
# valid since this connection has died. raise a WatchError, which
1523-
# indicates the user should retry this transaction.
1524-
if self.watching:
1525-
self.reset()
1526-
raise WatchError("A ConnectionError occurred on while "
1527-
"watching one or more keys")
1528-
# if retry_on_timeout is not set, or the error is not
1529-
# a TimeoutError, raise it
1530-
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
1531-
self.reset()
1532-
raise
1533-
1534-
# retry_on_timeout is set, this is a TimeoutError and we are not
1535-
# already WATCHing any variables. retry the command.
1536-
try:
1537-
conn.send_command(*args)
1538-
return self.parse_response(conn, command_name, **options)
1539-
except (ConnectionError, TimeoutError):
1540-
# a subsequent failure should simply be raised
1541-
self.reset()
1542-
raise
1572+
1573+
return conn.retry.call_with_retry(
1574+
lambda: self._send_command_parse_response(conn,
1575+
command_name,
1576+
*args,
1577+
**options),
1578+
lambda error: self._disconnect_reset_raise(conn, error))
15431579

15441580
def pipeline_execute_command(self, *args, **options):
15451581
"""
@@ -1672,6 +1708,25 @@ def load_scripts(self):
16721708
if not exist:
16731709
s.sha = immediate('SCRIPT LOAD', s.script)
16741710

1711+
def _disconnect_raise_reset(self, conn, error):
1712+
"""
1713+
Close the connection, raise an exception if we were watching,
1714+
and raise an exception if retry_on_timeout is not set,
1715+
or the error is not a TimeoutError
1716+
"""
1717+
conn.disconnect()
1718+
# if we were watching a variable, the watch is no longer valid
1719+
# since this connection has died. raise a WatchError, which
1720+
# indicates the user should retry this transaction.
1721+
if self.watching:
1722+
raise WatchError("A ConnectionError occurred on while "
1723+
"watching one or more keys")
1724+
# if retry_on_timeout is not set, or the error is not
1725+
# a TimeoutError, raise it
1726+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1727+
self.reset()
1728+
raise
1729+
16751730
def execute(self, raise_on_error=True):
16761731
"Execute all the commands in the current pipeline"
16771732
stack = self.command_stack
@@ -1693,21 +1748,9 @@ def execute(self, raise_on_error=True):
16931748
self.connection = conn
16941749

16951750
try:
1696-
return execute(conn, stack, raise_on_error)
1697-
except (ConnectionError, TimeoutError) as e:
1698-
conn.disconnect()
1699-
# if we were watching a variable, the watch is no longer valid
1700-
# since this connection has died. raise a WatchError, which
1701-
# indicates the user should retry this transaction.
1702-
if self.watching:
1703-
raise WatchError("A ConnectionError occurred on while "
1704-
"watching one or more keys")
1705-
# if retry_on_timeout is not set, or the error is not
1706-
# a TimeoutError, raise it
1707-
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
1708-
raise
1709-
# retry a TimeoutError when retry_on_timeout is set
1710-
return execute(conn, stack, raise_on_error)
1751+
return conn.retry.call_with_retry(
1752+
lambda: execute(conn, stack, raise_on_error),
1753+
lambda error: self._disconnect_raise_reset(conn, error))
17111754
finally:
17121755
self.reset()
17131756

0 commit comments

Comments
 (0)