Skip to content

Commit 7a0585a

Browse files
committed
Make Pool.close() wait until all checked out connections are released
Currently, `pool.close()`, despite the "graceful" designation, closes all connections immediately regardless of whether they are acquired. With this change, pool will wait for connections to actually be released before closing. WARNING: This is a potentially incompatible behavior change, as sloppily written code which does not release acquired connections will now cause `pool.close()` to hang forever. Also, when `conn.close()` or `conn.terminate()` are called directly on an acquired connection, the associated pool item is released immediately. Closes: #290
1 parent 8dd7a6c commit 7a0585a

File tree

5 files changed

+266
-129
lines changed

5 files changed

+266
-129
lines changed

asyncpg/connection.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -986,7 +986,7 @@ def is_closed(self):
986986
:return bool: ``True`` if the connection is closed, ``False``
987987
otherwise.
988988
"""
989-
return not self._protocol.is_connected() or self._aborted
989+
return self._aborted or not self._protocol.is_connected()
990990

991991
async def close(self, *, timeout=None):
992992
"""Close the connection gracefully.
@@ -997,30 +997,21 @@ async def close(self, *, timeout=None):
997997
.. versionchanged:: 0.14.0
998998
Added the *timeout* parameter.
999999
"""
1000-
if self.is_closed():
1001-
return
1002-
self._mark_stmts_as_closed()
1003-
self._listeners.clear()
1004-
self._log_listeners.clear()
1005-
self._aborted = True
10061000
try:
1007-
await self._protocol.close(timeout)
1001+
if not self.is_closed():
1002+
await self._protocol.close(timeout)
10081003
except Exception:
10091004
# If we fail to close gracefully, abort the connection.
1010-
self._aborted = True
1011-
self._protocol.abort()
1005+
self._abort()
10121006
raise
10131007
finally:
1014-
self._clean_tasks()
1008+
self._cleanup()
10151009

10161010
def terminate(self):
10171011
"""Terminate the connection without waiting for pending data."""
1018-
self._mark_stmts_as_closed()
1019-
self._listeners.clear()
1020-
self._log_listeners.clear()
1021-
self._aborted = True
1022-
self._protocol.abort()
1023-
self._clean_tasks()
1012+
if not self.is_closed():
1013+
self._abort()
1014+
self._cleanup()
10241015

10251016
async def reset(self, *, timeout=None):
10261017
self._check_open()
@@ -1043,6 +1034,26 @@ async def reset(self, *, timeout=None):
10431034
if reset_query:
10441035
await self.execute(reset_query, timeout=timeout)
10451036

1037+
def _abort(self):
1038+
# Put the connection into the aborted state.
1039+
self._aborted = True
1040+
self._protocol.abort()
1041+
self._protocol = None
1042+
1043+
def _cleanup(self):
1044+
# Free the resources associated with this connection.
1045+
# This must be called when a connection is terminated.
1046+
1047+
if self._proxy is not None:
1048+
# Connection is a member of a pool, so let the pool
1049+
# know that this connection is dead.
1050+
self._proxy._holder._release_on_close()
1051+
1052+
self._mark_stmts_as_closed()
1053+
self._listeners.clear()
1054+
self._log_listeners.clear()
1055+
self._clean_tasks()
1056+
10461057
def _clean_tasks(self):
10471058
# Wrap-up any remaining tasks associated with this connection.
10481059
if self._cancellations:

0 commit comments

Comments
 (0)