Skip to content

Commit 483e6e1

Browse files
committed
Add a test for Pool.release() interaction with Connection.cancel()
1 parent c241c42 commit 483e6e1

File tree

4 files changed

+73
-49
lines changed

4 files changed

+73
-49
lines changed

asyncpg/_testbase/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,11 @@ def get_connection_spec(cls, kwargs={}):
243243
conn_spec['user'] = 'postgres'
244244
return conn_spec
245245

246-
def create_pool(self, pool_class=pg_pool.Pool, **kwargs):
246+
def create_pool(self, pool_class=pg_pool.Pool,
247+
connection_class=pg_connection.Connection, **kwargs):
247248
conn_spec = self.get_connection_spec(kwargs)
248-
return create_pool(loop=self.loop, pool_class=pool_class, **conn_spec)
249+
return create_pool(loop=self.loop, pool_class=pool_class,
250+
connection_class=connection_class, **conn_spec)
249251

250252
@classmethod
251253
def connect(cls, **kwargs):

asyncpg/connection.py

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,47 +1052,47 @@ async def _cleanup_stmts(self):
10521052
# so we ignore the timeout.
10531053
await self._protocol.close_statement(stmt, protocol.NO_TIMEOUT)
10541054

1055+
async def _cancel(self, waiter):
1056+
r = w = None
1057+
1058+
try:
1059+
# Open new connection to the server
1060+
r, w = await connect_utils._open_connection(
1061+
loop=self._loop, addr=self._addr, params=self._params)
1062+
1063+
# Pack CancelRequest message
1064+
msg = struct.pack('!llll', 16, 80877102,
1065+
self._protocol.backend_pid,
1066+
self._protocol.backend_secret)
1067+
1068+
w.write(msg)
1069+
await r.read() # Wait until EOF
1070+
except ConnectionResetError as ex:
1071+
# On some systems Postgres will reset the connection
1072+
# after processing the cancellation command.
1073+
if r is None and not waiter.done():
1074+
waiter.set_exception(ex)
1075+
except asyncio.CancelledError:
1076+
# There are two scenarios in which the cancellation
1077+
# itself will be cancelled: 1) the connection is being closed,
1078+
# 2) the event loop is being shut down.
1079+
# In either case we do not care about the propagation of
1080+
# the CancelledError, and don't want the loop to warn about
1081+
# an unretrieved exception.
1082+
pass
1083+
except Exception as ex:
1084+
if not waiter.done():
1085+
waiter.set_exception(ex)
1086+
finally:
1087+
self._cancellations.discard(
1088+
asyncio.Task.current_task(self._loop))
1089+
if not waiter.done():
1090+
waiter.set_result(None)
1091+
if w is not None:
1092+
w.close()
1093+
10551094
def _cancel_current_command(self, waiter):
1056-
async def cancel():
1057-
r = w = None
1058-
1059-
try:
1060-
# Open new connection to the server
1061-
r, w = await connect_utils._open_connection(
1062-
loop=self._loop, addr=self._addr, params=self._params)
1063-
1064-
# Pack CancelRequest message
1065-
msg = struct.pack('!llll', 16, 80877102,
1066-
self._protocol.backend_pid,
1067-
self._protocol.backend_secret)
1068-
1069-
w.write(msg)
1070-
await r.read() # Wait until EOF
1071-
except ConnectionResetError as ex:
1072-
# On some systems Postgres will reset the connection
1073-
# after processing the cancellation command.
1074-
if r is None and not waiter.done():
1075-
waiter.set_exception(ex)
1076-
except asyncio.CancelledError:
1077-
# There are two scenarios in which the cancellation
1078-
# itself will be cancelled: 1) the connection is being closed,
1079-
# 2) the event loop is being shut down.
1080-
# In either case we do not care about the propagation of
1081-
# the CancelledError, and don't want the loop to warn about
1082-
# an unretrieved exception.
1083-
pass
1084-
except Exception as ex:
1085-
if not waiter.done():
1086-
waiter.set_exception(ex)
1087-
finally:
1088-
self._cancellations.discard(
1089-
asyncio.Task.current_task(self._loop))
1090-
if not waiter.done():
1091-
waiter.set_result(None)
1092-
if w is not None:
1093-
w.close()
1094-
1095-
self._cancellations.add(self._loop.create_task(cancel()))
1095+
self._cancellations.add(self._loop.create_task(self._cancel(waiter)))
10961096

10971097
def _process_log_message(self, fields, last_query):
10981098
if not self._log_listeners:

asyncpg/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ async def release(self, timeout):
200200
started = time.monotonic()
201201
await asyncio.wait_for(
202202
self._con._protocol._wait_for_cancellation(),
203-
timeout, loop=self._pool._loop)
203+
budget, loop=self._pool._loop)
204204
if budget is not None:
205205
budget -= time.monotonic() - started
206206

tests/test_pool.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ async def reset(self):
3636
return await super().reset()
3737

3838

39-
class SlowResetConnectionPool(pg_pool.Pool):
40-
async def _connect(self, *args, **kwargs):
41-
return await pg_connection.connect(
42-
*args, connection_class=SlowResetConnection, **kwargs)
39+
class SlowCancelConnection(pg_connection.Connection):
40+
"""Connection class to simulate races with Connection._cancel()."""
41+
async def _cancel(self, waiter):
42+
await asyncio.sleep(0.2, loop=self._loop)
43+
return await super()._cancel(waiter)
4344

4445

4546
class TestPool(tb.ConnectedTestCase):
@@ -351,12 +352,12 @@ async def worker():
351352
self.cluster.trust_local_connections()
352353
self.cluster.reload()
353354

354-
async def test_pool_handles_cancel_in_release(self):
355+
async def test_pool_handles_task_cancel_in_release(self):
355356
# Use SlowResetConnectionPool to simulate
356357
# the Task.cancel() and __aexit__ race.
357358
pool = await self.create_pool(database='postgres',
358359
min_size=1, max_size=1,
359-
pool_class=SlowResetConnectionPool)
360+
connection_class=SlowResetConnection)
360361

361362
async def worker():
362363
async with pool.acquire():
@@ -372,6 +373,27 @@ async def worker():
372373
# Check that the connection has been returned to the pool.
373374
self.assertEqual(pool._queue.qsize(), 1)
374375

376+
async def test_pool_handles_query_cancel_in_release(self):
377+
# Use SlowResetConnectionPool to simulate
378+
# the Task.cancel() and __aexit__ race.
379+
pool = await self.create_pool(database='postgres',
380+
min_size=1, max_size=1,
381+
connection_class=SlowCancelConnection)
382+
383+
async def worker():
384+
async with pool.acquire() as con:
385+
await con.execute('SELECT pg_sleep(10)')
386+
387+
task = self.loop.create_task(worker())
388+
# Let the worker() run.
389+
await asyncio.sleep(0.1, loop=self.loop)
390+
# Cancel the worker.
391+
task.cancel()
392+
# Wait to make sure the cleanup has completed.
393+
await asyncio.sleep(0.5, loop=self.loop)
394+
# Check that the connection has been returned to the pool.
395+
self.assertEqual(pool._queue.qsize(), 1)
396+
375397
async def test_pool_no_acquire_deadlock(self):
376398
async with self.create_pool(database='postgres',
377399
min_size=1, max_size=1,

0 commit comments

Comments
 (0)