Skip to content

Commit

Permalink
Fix retry logic for pubsub and pipeline (#3134)
Browse files Browse the repository at this point in the history
* Fix retry logic for pubsub and pipeline

Extend the fix from bea7299 to apply to
pipeline and pubsub as well.

Fixes #2973

* fix isort

---------

Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
  • Loading branch information
2 people authored and vladvildanov committed Sep 27, 2024
1 parent d952951 commit 9cac5cd
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
36 changes: 23 additions & 13 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,15 @@ async def connect(self):
async def _disconnect_raise_connect(self, conn, error):
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
await conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
await conn.connect()

Expand Down Expand Up @@ -1340,8 +1344,8 @@ async def _disconnect_reset_raise(self, conn, error):
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
await conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1352,9 +1356,12 @@ async def _disconnect_reset_raise(self, conn, error):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.aclose()
raise

Expand Down Expand Up @@ -1529,8 +1536,8 @@ async def load_scripts(self):
async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
"""
Close the connection, raise an exception if we were watching,
and raise an exception if retry_on_timeout is not set,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
await conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1540,9 +1547,12 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.reset()
raise

Expand Down
50 changes: 34 additions & 16 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
SentinelCommands,
list_or_args,
)
from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
from redis.connection import (
AbstractConnection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
)
from redis.credentials import CredentialProvider
from redis.exceptions import (
ConnectionError,
Expand Down Expand Up @@ -836,11 +841,15 @@ def clean_health_check_responses(self) -> None:
def _disconnect_raise_connect(self, conn, error) -> None:
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
conn.connect()

Expand Down Expand Up @@ -1317,8 +1326,8 @@ def _disconnect_reset_raise(self, conn, error) -> None:
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1329,9 +1338,12 @@ def _disconnect_reset_raise(self, conn, error) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
self.reset()
raise

Expand Down Expand Up @@ -1489,11 +1501,15 @@ def load_scripts(self):
if not exist:
s.sha = immediate("SCRIPT LOAD", s.script)

def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
def _disconnect_raise_reset(
self,
conn: AbstractConnection,
error: Exception,
) -> None:
"""
Close the connection, raise an exception if we were watching,
and raise an exception if TimeoutError is not part of retry_on_error,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1503,11 +1519,13 @@ def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if TimeoutError is not part of retry_on_error, or the error
# is not a TimeoutError, raise it
if not (
TimeoutError in conn.retry_on_error and isinstance(error, TimeoutError)
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):

self.reset()
raise error

Expand Down

0 comments on commit 9cac5cd

Please sign in to comment.