Skip to content

Commit baf2af2

Browse files
committed
Implemented SentinelBlockingConnectionPool.
1 parent a745bde commit baf2af2

File tree

6 files changed

+114
-28
lines changed

6 files changed

+114
-28
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Add SentinelBlockingConnectionPool class
12
* Move doctests (doc code examples) to main branch
23
* Update `ResponseT` type hint
34
* Allow to control the minimum SSL version

docs/connections.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ SentinelConnectionPool
3535
.. autoclass:: redis.sentinel.SentinelConnectionPool
3636
:members:
3737

38+
SentinelBlockingConnectionPool
39+
======================
40+
.. autoclass:: redis.sentinel.SentinelBlockingConnectionPool
41+
:members:
42+
3843

3944
Cluster Client
4045
**************

redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from redis.sentinel import (
3232
Sentinel,
33+
SentinelBlockingConnectionPool,
3334
SentinelConnectionPool,
3435
SentinelManagedConnection,
3536
SentinelManagedSSLConnection,
@@ -77,6 +78,7 @@ def int_or_str(value):
7778
"RedisError",
7879
"ResponseError",
7980
"Sentinel",
81+
"SentinelBlockingConnectionPool",
8082
"SentinelConnectionPool",
8183
"SentinelManagedConnection",
8284
"SentinelManagedSSLConnection",

redis/asyncio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from redis.asyncio.sentinel import (
1111
Sentinel,
12+
SentinelBlockingConnectionPool,
1213
SentinelConnectionPool,
1314
SentinelManagedConnection,
1415
SentinelManagedSSLConnection,
@@ -53,6 +54,7 @@
5354
"RedisError",
5455
"ResponseError",
5556
"Sentinel",
57+
"SentinelBlockingConnectionPool",
5658
"SentinelConnectionPool",
5759
"SentinelManagedConnection",
5860
"SentinelManagedSSLConnection",

redis/asyncio/sentinel.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from redis.asyncio.client import Redis
77
from redis.asyncio.connection import (
8+
BlockingConnectionPool,
89
Connection,
910
ConnectionPool,
1011
EncodableT,
@@ -203,12 +204,38 @@ async def get_master_address(self):
203204
def rotate_slaves(self) -> AsyncIterator:
204205
"""Round-robin slave balancer"""
205206
return self.proxy.rotate_slaves()
207+
208+
209+
class SentinelBlockingConnectionPool(BlockingConnectionPool):
210+
"""
211+
Sentinel blocking connection pool.
212+
213+
If ``check_connection`` flag is set to True, SentinelManagedConnection
214+
sends a PING command right after establishing the connection.
215+
"""
216+
217+
def __init__(self, service_name, sentinel_manager, **kwargs):
218+
kwargs["connection_class"] = kwargs.get(
219+
"connection_class",
220+
(
221+
SentinelManagedSSLConnection
222+
if kwargs.pop("ssl", False)
223+
else SentinelManagedConnection
224+
),
225+
)
226+
self.is_master = kwargs.pop("is_master", True)
227+
self.check_connection = kwargs.pop("check_connection", False)
228+
self.proxy = SentinelConnectionPoolProxy(
229+
connection_pool=self,
230+
is_master=self.is_master,
231+
check_connection=self.check_connection,
232+
service_name=service_name,
233+
sentinel_manager=sentinel_manager,
234+
)
206235
super().__init__(**kwargs)
207236
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
208237
self.service_name = service_name
209238
self.sentinel_manager = sentinel_manager
210-
self.master_address = None
211-
self.slave_rr_counter = None
212239

213240
def __repr__(self):
214241
return (
@@ -218,8 +245,11 @@ def __repr__(self):
218245

219246
def reset(self):
220247
super().reset()
221-
self.master_address = None
222-
self.slave_rr_counter = None
248+
self.proxy.reset()
249+
250+
@property
251+
def master_address(self):
252+
return self.proxy.master_address
223253

224254
def owns_connection(self, connection: Connection):
225255
check = not self.is_master or (
@@ -228,31 +258,11 @@ def owns_connection(self, connection: Connection):
228258
return check and super().owns_connection(connection)
229259

230260
async def get_master_address(self):
231-
master_address = await self.sentinel_manager.discover_master(self.service_name)
232-
if self.is_master:
233-
if self.master_address != master_address:
234-
self.master_address = master_address
235-
# disconnect any idle connections so that they reconnect
236-
# to the new master the next time that they are used.
237-
await self.disconnect(inuse_connections=False)
238-
return master_address
261+
return await self.proxy.get_master_address()
239262

240-
async def rotate_slaves(self) -> AsyncIterator:
263+
def rotate_slaves(self) -> AsyncIterator:
241264
"""Round-robin slave balancer"""
242-
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
243-
if slaves:
244-
if self.slave_rr_counter is None:
245-
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
246-
for _ in range(len(slaves)):
247-
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
248-
slave = slaves[self.slave_rr_counter]
249-
yield slave
250-
# Fallback to the master connection
251-
try:
252-
yield await self.get_master_address()
253-
except MasterNotFoundError:
254-
pass
255-
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
265+
return self.proxy.rotate_slaves()
256266

257267

258268
class Sentinel(AsyncSentinelCommands):

redis/sentinel.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44

55
from redis.client import Redis
66
from redis.commands import SentinelCommands
7-
from redis.connection import Connection, ConnectionPool, SSLConnection
7+
from redis.connection import (
8+
BlockingConnectionPool,
9+
Connection,
10+
ConnectionPool,
11+
SSLConnection,
12+
)
813
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
914
from redis.utils import str_if_bytes
1015

@@ -195,6 +200,67 @@ def rotate_slaves(self):
195200
return self.proxy.rotate_slaves()
196201

197202

203+
class SentinelBlockingConnectionPool(BlockingConnectionPool):
204+
"""
205+
Sentinel blocking connection pool.
206+
207+
If ``check_connection`` flag is set to True, SentinelManagedConnection
208+
sends a PING command right after establishing the connection.
209+
"""
210+
211+
def __init__(self, service_name, sentinel_manager, **kwargs):
212+
kwargs["connection_class"] = kwargs.get(
213+
"connection_class",
214+
(
215+
SentinelManagedSSLConnection
216+
if kwargs.pop("ssl", False)
217+
else SentinelManagedConnection
218+
),
219+
)
220+
self.is_master = kwargs.pop("is_master", True)
221+
self.check_connection = kwargs.pop("check_connection", False)
222+
self.proxy = SentinelConnectionPoolProxy(
223+
connection_pool=self,
224+
is_master=self.is_master,
225+
check_connection=self.check_connection,
226+
service_name=service_name,
227+
sentinel_manager=sentinel_manager,
228+
)
229+
super().__init__(**kwargs)
230+
self.connection_kwargs["connection_pool"] = self.proxy
231+
self.service_name = service_name
232+
self.sentinel_manager = sentinel_manager
233+
234+
def __repr__(self):
235+
role = "master" if self.is_master else "slave"
236+
return (
237+
f"<{type(self).__module__}.{type(self).__name__}"
238+
f"(service={self.service_name}({role}))>"
239+
)
240+
241+
def reset(self):
242+
super().reset()
243+
self.proxy.reset()
244+
245+
@property
246+
def master_address(self):
247+
return self.proxy.master_address
248+
249+
def owns_connection(self, connection):
250+
check = not self.is_master or (
251+
self.is_master and self.master_address == (connection.host, connection.port)
252+
)
253+
parent = super()
254+
return check and parent.owns_connection(connection)
255+
256+
def get_master_address(self):
257+
return self.proxy.get_master_address()
258+
259+
def rotate_slaves(self):
260+
"Round-robin slave balancer"
261+
return self.proxy.rotate_slaves()
262+
263+
198264
class Sentinel(SentinelCommands):
199265
"""
200266
Redis Sentinel cluster client

0 commit comments

Comments
 (0)