Skip to content

SentinelManagedConnection searches for new master upon connection failure (#3560) #3601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,18 @@ async def connect(self):
"""Connects to the Redis server if not already connected"""
await self.connect_check_health(check_health=True)

async def connect_check_health(self, check_health: bool = True):
async def connect_check_health(
self, check_health: bool = True, retry_socket_connect: bool = True
):
if self.is_connected:
return
try:
await self.retry.call_with_retry(
lambda: self._connect(), lambda error: self.disconnect()
)
if retry_socket_connect:
await self.retry.call_with_retry(
lambda: self._connect(), lambda error: self.disconnect()
)
else:
await self._connect()
except asyncio.CancelledError:
raise # in 3.7 and earlier, this is an Exception, not BaseException
except (socket.timeout, asyncio.TimeoutError):
Expand Down
17 changes: 10 additions & 7 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
SSLConnection,
)
from redis.commands import AsyncSentinelCommands
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.utils import str_if_bytes
from redis.exceptions import (
ConnectionError,
ReadOnlyError,
ResponseError,
TimeoutError,
)


class MasterNotFoundError(ConnectionError):
Expand All @@ -37,11 +41,10 @@ def __repr__(self):

async def connect_to(self, address):
self.host, self.port = address
await super().connect()
if self.connection_pool.check_connection:
await self.send_command("PING")
if str_if_bytes(await self.read_response()) != "PONG":
raise ConnectionError("PING failed")
await self.connect_check_health(
check_health=self.connection_pool.check_connection,
retry_socket_connect=False,
)

async def _connect_retry(self):
if self._reader:
Expand Down
13 changes: 9 additions & 4 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,18 @@ def connect(self):
"Connects to the Redis server if not already connected"
self.connect_check_health(check_health=True)

def connect_check_health(self, check_health: bool = True):
def connect_check_health(
self, check_health: bool = True, retry_socket_connect: bool = True
):
if self._sock:
return
try:
sock = self.retry.call_with_retry(
lambda: self._connect(), lambda error: self.disconnect(error)
)
if retry_socket_connect:
sock = self.retry.call_with_retry(
lambda: self._connect(), lambda error: self.disconnect(error)
)
else:
sock = self._connect()
except socket.timeout:
raise TimeoutError("Timeout connecting to server")
except OSError as e:
Expand Down
18 changes: 11 additions & 7 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from redis.client import Redis
from redis.commands import SentinelCommands
from redis.connection import Connection, ConnectionPool, SSLConnection
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.utils import str_if_bytes
from redis.exceptions import (
ConnectionError,
ReadOnlyError,
ResponseError,
TimeoutError,
)


class MasterNotFoundError(ConnectionError):
Expand Down Expand Up @@ -35,11 +39,11 @@ def __repr__(self):

def connect_to(self, address):
self.host, self.port = address
super().connect()
if self.connection_pool.check_connection:
self.send_command("PING")
if str_if_bytes(self.read_response()) != "PONG":
raise ConnectionError("PING failed")

self.connect_check_health(
check_health=self.connection_pool.check_connection,
retry_socket_connect=False,
)

def _connect_retry(self):
if self._sock:
Expand Down
1 change: 1 addition & 0 deletions tests/test_asyncio/test_sentinel_managed_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ async def mock_connect():
conn._connect.side_effect = mock_connect
await conn.connect()
assert conn._connect.call_count == 3
assert connection_pool.get_master_address.call_count == 3
await conn.disconnect()
34 changes: 34 additions & 0 deletions tests/test_sentinel_managed_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import socket

from redis.retry import Retry
from redis.sentinel import SentinelManagedConnection
from redis.backoff import NoBackoff
from unittest import mock


def test_connect_retry_on_timeout_error(master_host):
"""Test that the _connect function is retried in case of a timeout"""
connection_pool = mock.Mock()
connection_pool.get_master_address = mock.Mock(
return_value=(master_host[0], master_host[1])
)
conn = SentinelManagedConnection(
retry_on_timeout=True,
retry=Retry(NoBackoff(), 3),
connection_pool=connection_pool,
)
origin_connect = conn._connect
conn._connect = mock.Mock()

def mock_connect():
# connect only on the last retry
if conn._connect.call_count <= 2:
raise socket.timeout
else:
return origin_connect()

conn._connect.side_effect = mock_connect
conn.connect()
assert conn._connect.call_count == 3
assert connection_pool.get_master_address.call_count == 3
conn.disconnect()