File tree Expand file tree Collapse file tree 4 files changed +41
-18
lines changed Expand file tree Collapse file tree 4 files changed +41
-18
lines changed Original file line number Diff line number Diff line change @@ -90,3 +90,19 @@ async def wait_closed(stream):
9090 # On Windows wait_closed() sometimes propagates
9191 # ConnectionResetError which is totally unnecessary.
9292 pass
93+
94+
95+ # Workaround for https://bugs.python.org/issue37658
96+ async def wait_for (fut , timeout ):
97+ if timeout is None :
98+ return await fut
99+
100+ fut = asyncio .ensure_future (fut )
101+
102+ try :
103+ return await asyncio .wait_for (fut , timeout )
104+ except asyncio .CancelledError :
105+ if fut .done ():
106+ return fut .result ()
107+ else :
108+ raise
Original file line number Diff line number Diff line change @@ -636,18 +636,13 @@ async def _connect_addr(
636636
637637 connector = asyncio .ensure_future (connector )
638638 before = time .monotonic ()
639- try :
640- tr , pr = await asyncio .wait_for (
641- connector , timeout = timeout )
642- except asyncio .CancelledError :
643- connector .add_done_callback (_close_leaked_connection )
644- raise
639+ tr , pr = await compat .wait_for (connector , timeout = timeout )
645640 timeout -= time .monotonic () - before
646641
647642 try :
648643 if timeout <= 0 :
649644 raise asyncio .TimeoutError
650- await asyncio .wait_for (connected , timeout = timeout )
645+ await compat .wait_for (connected , timeout = timeout )
651646 except (Exception , asyncio .CancelledError ):
652647 tr .close ()
653648 raise
@@ -745,12 +740,3 @@ def _create_future(loop):
745740 return asyncio .Future (loop = loop )
746741 else :
747742 return create_future ()
748-
749-
750- def _close_leaked_connection (fut ):
751- try :
752- tr , pr = fut .result ()
753- if tr :
754- tr .close ()
755- except asyncio .CancelledError :
756- pass # hide the exception
Original file line number Diff line number Diff line change 1212import time
1313import warnings
1414
15+ from . import compat
1516from . import connection
1617from . import connect_utils
1718from . import exceptions
@@ -198,7 +199,7 @@ async def release(self, timeout):
198199 # If the connection is in cancellation state,
199200 # wait for the cancellation
200201 started = time .monotonic ()
201- await asyncio .wait_for (
202+ await compat .wait_for (
202203 self ._con ._protocol ._wait_for_cancellation (),
203204 budget )
204205 if budget is not None :
@@ -623,7 +624,7 @@ async def _acquire_impl():
623624 if timeout is None :
624625 return await _acquire_impl ()
625626 else :
626- return await asyncio .wait_for (
627+ return await compat .wait_for (
627628 _acquire_impl (), timeout = timeout )
628629
629630 async def release (self , connection , * , timeout = None ):
Original file line number Diff line number Diff line change @@ -379,6 +379,26 @@ async def worker():
379379 self .cluster .trust_local_connections ()
380380 self .cluster .reload ()
381381
382+ async def test_pool_handles_task_cancel_in_acquire_with_timeout (self ):
383+ # See https://github.com/MagicStack/asyncpg/issues/547
384+ pool = await self .create_pool (database = 'postgres' ,
385+ min_size = 1 , max_size = 1 )
386+
387+ async def worker ():
388+ async with pool .acquire (timeout = 100 ):
389+ pass
390+
391+ # Schedule task
392+ task = self .loop .create_task (worker ())
393+ # Yield to task, but cancel almost immediately
394+ await asyncio .sleep (0.00000000001 )
395+ # Cancel the worker.
396+ task .cancel ()
397+ # Wait to make sure the cleanup has completed.
398+ await asyncio .sleep (0.4 )
399+ # Check that the connection has been returned to the pool.
400+ self .assertEqual (pool ._queue .qsize (), 1 )
401+
382402 async def test_pool_handles_task_cancel_in_release (self ):
383403 # Use SlowResetConnectionPool to simulate
384404 # the Task.cancel() and __aexit__ race.
You can’t perform that action at this time.
0 commit comments