@@ -1015,6 +1015,13 @@ def reset(self):
10151015 self ._available_connections = []
10161016 self ._in_use_connections = set ()
10171017
1018+ def can_get_connection (self ) -> bool :
1019+ """Return True if a connection can be retrieved from the pool."""
1020+ return (
1021+ self ._available_connections
1022+ or self ._created_connections < self .max_connections
1023+ )
1024+
10181025 async def get_connection (self , command_name , * keys , ** options ):
10191026 """Get a connection from the pool"""
10201027 try :
@@ -1102,19 +1109,19 @@ class BlockingConnectionPool(ConnectionPool):
11021109 """
11031110 A blocking connection pool::
11041111
1105- >>> from redis.client import Redis
1112+ >>> from redis.asyncio. client import Redis
11061113 >>> client = Redis(connection_pool=BlockingConnectionPool())
11071114
11081115 It performs the same function as the default
1109- :py:class:`~redis.ConnectionPool` implementation, in that,
1116+ :py:class:`~redis.asyncio. ConnectionPool` implementation, in that,
11101117 it maintains a pool of reusable connections that can be shared by
11111118 multiple async redis clients.
11121119
11131120 The difference is that, in the event that a client tries to get a
11141121 connection from the pool when all of connections are in use, rather than
11151122 raising a :py:class:`~redis.ConnectionError` (as the default
1116- :py:class:`~redis.ConnectionPool` implementation does), it
1117- makes the client wait ("blocks") for a specified number of seconds until
1123+ :py:class:`~redis.asyncio. ConnectionPool` implementation does), it
1124+ makes blocks the current `Task` for a specified number of seconds until
11181125 a connection becomes available.
11191126
11201127 Use ``max_connections`` to increase / decrease the pool size::
@@ -1137,107 +1144,30 @@ def __init__(
11371144 max_connections : int = 50 ,
11381145 timeout : Optional [int ] = 20 ,
11391146 connection_class : Type [AbstractConnection ] = Connection ,
1140- queue_class : Type [asyncio .Queue ] = asyncio .LifoQueue ,
1147+ queue_class : Type [asyncio .Queue ] = asyncio .LifoQueue , # deprecated
11411148 ** connection_kwargs ,
11421149 ):
11431150
1144- self .queue_class = queue_class
1145- self .timeout = timeout
1146- self ._connections : List [AbstractConnection ]
11471151 super ().__init__ (
11481152 connection_class = connection_class ,
11491153 max_connections = max_connections ,
11501154 ** connection_kwargs ,
11511155 )
1152- self ._lock = asyncio .Lock ()
1153-
1154- def reset (self ):
1155- # Create and fill up a queue with ``None`` values.
1156- self .pool = self .queue_class (self .max_connections )
1157- while True :
1158- try :
1159- self .pool .put_nowait (None )
1160- except asyncio .QueueFull :
1161- break
1162-
1163- # Keep a list of actual connection instances so that we can
1164- # disconnect them later.
1165- self ._connections = []
1166-
1167- def make_connection (self ):
1168- """Make a fresh connection."""
1169- connection = self .connection_class (** self .connection_kwargs )
1170- self ._connections .append (connection )
1171- return connection
1156+ self ._condition = asyncio .Condition ()
1157+ self .timeout = timeout
11721158
11731159 async def get_connection (self , command_name , * keys , ** options ):
1174- """
1175- Get a connection, blocking for ``self.timeout`` until a connection
1176- is available from the pool.
1177-
1178- If the connection returned is ``None`` then creates a new connection.
1179- Because we use a last-in first-out queue, the existing connections
1180- (having been returned to the pool after the initial ``None`` values
1181- were added) will be returned before ``None`` values. This means we only
1182- create new connections when we need to, i.e.: the actual number of
1183- connections will only increase in response to demand.
1184- """
1185-
1186- # Try and get a connection from the pool. If one isn't available within
1187- # self.timeout then raise a ``ConnectionError``.
1188- connection = None
1160+ """Gets a connection from the pool, blocking until one is available"""
11891161 try :
11901162 async with async_timeout (self .timeout ):
1191- connection = await self .pool .get ()
1192- except (asyncio .QueueEmpty , asyncio .TimeoutError ):
1193- # Note that this is not caught by the redis client and will be
1194- # raised unless handled by application code. If you want never to
1195- raise ConnectionError ("No connection available." )
1196-
1197- # If the ``connection`` is actually ``None`` then that's a cue to make
1198- # a new connection to add to the pool.
1199- if connection is None :
1200- connection = self .make_connection ()
1201-
1202- try :
1203- # ensure this connection is connected to Redis
1204- await connection .connect ()
1205- # connections that the pool provides should be ready to send
1206- # a command. if not, the connection was either returned to the
1207- # pool before all data has been read or the socket has been
1208- # closed. either way, reconnect and verify everything is good.
1209- try :
1210- if await connection .can_read_destructive ():
1211- raise ConnectionError ("Connection has data" ) from None
1212- except (ConnectionError , OSError ):
1213- await connection .disconnect ()
1214- await connection .connect ()
1215- if await connection .can_read_destructive ():
1216- raise ConnectionError ("Connection not ready" ) from None
1217- except BaseException :
1218- # release the connection back to the pool so that we don't leak it
1219- await self .release (connection )
1220- raise
1221-
1222- return connection
1163+ async with self ._condition :
1164+ await self ._condition .wait_for (self .can_get_connection )
1165+ return await super ().get_connection (command_name , * keys , ** options )
1166+ except asyncio .TimeoutError as err :
1167+ raise ConnectionError ("No connection available." ) from err
12231168
12241169 async def release (self , connection : AbstractConnection ):
12251170 """Releases the connection back to the pool."""
1226- # Put the connection back into the pool.
1227- try :
1228- self .pool .put_nowait (connection )
1229- except asyncio .QueueFull :
1230- # perhaps the pool has been reset() after a fork? regardless,
1231- # we don't want this connection
1232- pass
1233-
1234- async def disconnect (self , inuse_connections : bool = True ):
1235- """Disconnects all connections in the pool."""
1236- async with self ._lock :
1237- resp = await asyncio .gather (
1238- * (connection .disconnect () for connection in self ._connections ),
1239- return_exceptions = True ,
1240- )
1241- exc = next ((r for r in resp if isinstance (r , BaseException )), None )
1242- if exc :
1243- raise exc
1171+ async with self ._condition :
1172+ await super ().release (connection )
1173+ self ._condition .notify ()
0 commit comments