Skip to content

Commit c43bd6f

Browse files
committed
Make Pool.close() wait until all checked out connections are released
Currently, `pool.close()`, despite the "graceful" designation, closes all connections immediately regardless of whether they are acquired. With this change, pool will wait for connections to actually be released before closing. WARNING: This is a potentially incompatible behavior change, as sloppily written code which does not release acquired connections will now cause `pool.close()` to hang forever. Also, when `conn.close()` or `conn.terminate()` are called directly on an acquired connection, the associated pool item is released immediately. Closes: #290
1 parent cf523be commit c43bd6f

File tree

5 files changed

+266
-129
lines changed

5 files changed

+266
-129
lines changed

asyncpg/connection.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ def is_closed(self):
984984
:return bool: ``True`` if the connection is closed, ``False``
985985
otherwise.
986986
"""
987-
return not self._protocol.is_connected() or self._aborted
987+
return self._aborted or not self._protocol.is_connected()
988988

989989
async def close(self, *, timeout=None):
990990
"""Close the connection gracefully.
@@ -995,30 +995,21 @@ async def close(self, *, timeout=None):
995995
.. versionchanged:: 0.14.0
996996
Added the *timeout* parameter.
997997
"""
998-
if self.is_closed():
999-
return
1000-
self._mark_stmts_as_closed()
1001-
self._listeners.clear()
1002-
self._log_listeners.clear()
1003-
self._aborted = True
1004998
try:
1005-
await self._protocol.close(timeout)
999+
if not self.is_closed():
1000+
await self._protocol.close(timeout)
10061001
except Exception:
10071002
# If we fail to close gracefully, abort the connection.
1008-
self._aborted = True
1009-
self._protocol.abort()
1003+
self._abort()
10101004
raise
10111005
finally:
1012-
self._clean_tasks()
1006+
self._cleanup()
10131007

10141008
def terminate(self):
10151009
"""Terminate the connection without waiting for pending data."""
1016-
self._mark_stmts_as_closed()
1017-
self._listeners.clear()
1018-
self._log_listeners.clear()
1019-
self._aborted = True
1020-
self._protocol.abort()
1021-
self._clean_tasks()
1010+
if not self.is_closed():
1011+
self._abort()
1012+
self._cleanup()
10221013

10231014
async def reset(self, *, timeout=None):
10241015
self._check_open()
@@ -1041,6 +1032,26 @@ async def reset(self, *, timeout=None):
10411032
if reset_query:
10421033
await self.execute(reset_query, timeout=timeout)
10431034

1035+
def _abort(self):
1036+
# Put the connection into the aborted state.
1037+
self._aborted = True
1038+
self._protocol.abort()
1039+
self._protocol = None
1040+
1041+
def _cleanup(self):
1042+
# Free the resources associated with this connection.
1043+
# This must be called when a connection is terminated.
1044+
1045+
if self._proxy is not None:
1046+
# Connection is a member of a pool, so let the pool
1047+
# know that this connection is dead.
1048+
self._proxy._holder._release_on_close()
1049+
1050+
self._mark_stmts_as_closed()
1051+
self._listeners.clear()
1052+
self._log_listeners.clear()
1053+
self._clean_tasks()
1054+
10441055
def _clean_tasks(self):
10451056
# Wrap-up any remaining tasks associated with this connection.
10461057
if self._cancellations:

0 commit comments

Comments
 (0)