@@ -77,38 +77,32 @@ def __init__(self) -> None:
7777 self .connections : Dict [int , TcpServerConnection ] = {}
7878 self .pools : Dict [Tuple [str , int ], Set [TcpServerConnection ]] = {}
7979
80- def add (self , addr : Tuple [str , int ]) -> TcpServerConnection :
81- """Creates and add a new connection to the pool."""
82- new_conn = TcpServerConnection (addr [0 ], addr [1 ])
83- new_conn .connect ()
84- self ._add (new_conn )
85- logger .debug (
86- 'Created new connection#{2} for upstream {0}:{1}' .format (
87- addr [0 ], addr [1 ], id (new_conn ),
88- ),
89- )
90- return new_conn
91-
9280 def acquire (self , addr : Tuple [str , int ]) -> Tuple [bool , TcpServerConnection ]:
9381 """Returns a reusable connection from the pool.
9482
9583 If none exists, will create and return a new connection."""
84+ created , conn = False , None
9685 if addr in self .pools :
9786 for old_conn in self .pools [addr ]:
9887 if old_conn .is_reusable ():
99- old_conn . mark_inuse ()
88+ conn = old_conn
10089 logger .debug (
10190 'Reusing connection#{2} for upstream {0}:{1}' .format (
10291 addr [0 ], addr [1 ], id (old_conn ),
10392 ),
10493 )
105- return False , old_conn
106- new_conn = self .add (addr )
107- new_conn .mark_inuse ()
108- return True , new_conn
94+ break
95+ if conn is None :
96+ created , conn = True , self .add (addr )
97+ conn .mark_inuse ()
98+ return created , conn
10999
110100 def release (self , conn : TcpServerConnection ) -> None :
111- """Release a previously acquired connection."""
101+ """Release a previously acquired connection.
102+
103+ Releasing a connection will shutdown and close the socket
104+ including internal pool cleanup.
105+ """
112106 assert not conn .is_reusable ()
113107 logger .debug (
114108 'Removing connection#{2} from pool from upstream {0}:{1}' .format (
@@ -118,7 +112,8 @@ def release(self, conn: TcpServerConnection) -> None:
118112 self ._remove (conn .connection .fileno ())
119113
120114 def retain (self , conn : TcpServerConnection ) -> None :
121- """Retained the connection in the pool for reusability."""
115+ """Retained previously acquired connection in the pool for reusability."""
116+ assert not conn .closed
122117 logger .debug (
123118 'Retaining connection#{2} to upstream {0}:{1}' .format (
124119 conn .addr [0 ], conn .addr [1 ], id (conn ),
@@ -150,6 +145,23 @@ async def handle_events(self, readables: Readables, _writables: Writables) -> bo
150145 self ._remove (fileno )
151146 return False
152147
148+ def add (self , addr : Tuple [str , int ]) -> TcpServerConnection :
149+ """Creates, connects and adds a new connection to the pool.
150+
151+ Returns newly created connection.
152+
153+ NOTE: You must not use the returned connection, instead use `acquire`.
154+ """
155+ new_conn = TcpServerConnection (addr [0 ], addr [1 ])
156+ new_conn .connect ()
157+ self ._add (new_conn )
158+ logger .debug (
159+ 'Created new connection#{2} for upstream {0}:{1}' .format (
160+ addr [0 ], addr [1 ], id (new_conn ),
161+ ),
162+ )
163+ return new_conn
164+
153165 def _add (self , conn : TcpServerConnection ) -> None :
154166 """Adds a new connection to internal data structure."""
155167 if conn .addr not in self .pools :
0 commit comments