@@ -998,10 +998,8 @@ def __init__(
998998 self .connection_kwargs = connection_kwargs
999999 self .max_connections = max_connections
10001000
1001- self ._created_connections : int
1002- self ._available_connections : List [AbstractConnection ]
1003- self ._in_use_connections : Set [AbstractConnection ]
1004- self .reset () # lgtm [py/init-calls-subclass]
1001+ self ._available_connections : List [AbstractConnection ] = []
1002+ self ._in_use_connections : Set [AbstractConnection ] = set ()
10051003 self .encoder_class = self .connection_kwargs .get ("encoder_class" , Encoder )
10061004
10071005 def __repr__ (self ):
@@ -1011,7 +1009,6 @@ def __repr__(self):
10111009 )
10121010
10131011 def reset (self ):
1014- self ._created_connections = 0
10151012 self ._available_connections = []
10161013 self ._in_use_connections = set ()
10171014
@@ -1027,27 +1024,14 @@ async def get_connection(self, command_name, *keys, **options):
10271024 try :
10281025 connection = self ._available_connections .pop ()
10291026 except IndexError :
1027+ if len (self ._in_use_connections ) >= self .max_connections :
1028+ raise ConnectionError ("Too many connections" ) from None
10301029 connection = self .make_connection ()
10311030 self ._in_use_connections .add (connection )
10321031
10331032 try :
1034- # ensure this connection is connected to Redis
1035- await connection .connect ()
1036- # connections that the pool provides should be ready to send
1037- # a command. if not, the connection was either returned to the
1038- # pool before all data has been read or the socket has been
1039- # closed. either way, reconnect and verify everything is good.
1040- try :
1041- if await connection .can_read_destructive ():
1042- raise ConnectionError ("Connection has data" ) from None
1043- except (ConnectionError , OSError ):
1044- await connection .disconnect ()
1045- await connection .connect ()
1046- if await connection .can_read_destructive ():
1047- raise ConnectionError ("Connection not ready" ) from None
1033+ await self .ensure_connection (connection )
10481034 except BaseException :
1049- # release the connection back to the pool so that we don't
1050- # leak it
10511035 await self .release (connection )
10521036 raise
10531037
@@ -1063,12 +1047,25 @@ def get_encoder(self):
10631047 )
10641048
10651049 def make_connection (self ):
1066- """Create a new connection"""
1067- if self ._created_connections >= self .max_connections :
1068- raise ConnectionError ("Too many connections" )
1069- self ._created_connections += 1
1050+ """Create a new connection. Can be overridden by child classes."""
10701051 return self .connection_class (** self .connection_kwargs )
10711052
1053+ async def ensure_connection (self , connection : AbstractConnection ):
1054+ """Ensure that the connection object is connected and valid"""
1055+ await connection .connect ()
1056+ # connections that the pool provides should be ready to send
1057+ # a command. if not, the connection was either returned to the
1058+ # pool before all data has been read or the socket has been
1059+ # closed. either way, reconnect and verify everything is good.
1060+ try :
1061+ if await connection .can_read_destructive ():
1062+ raise ConnectionError ("Connection has data" ) from None
1063+ except (ConnectionError , OSError ):
1064+ await connection .disconnect ()
1065+ await connection .connect ()
1066+ if await connection .can_read_destructive ():
1067+ raise ConnectionError ("Connection not ready" ) from None
1068+
10721069 async def release (self , connection : AbstractConnection ):
10731070 """Releases the connection back to the pool"""
10741071 # Connections should always be returned to the correct pool,
0 commit comments