Skip to content

Commit 26f1aea

Browse files
committed
Async generator hooks, simpler approach
1 parent 094e015 commit 26f1aea

File tree

5 files changed

+319
-23
lines changed

5 files changed

+319
-23
lines changed

trio/_core/_entry_queue.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,15 @@ def run_cb(job):
5656
async def kill_everything(exc):
5757
raise exc
5858

59-
_core.spawn_system_task(kill_everything, exc)
59+
try:
60+
_core.spawn_system_task(kill_everything, exc)
61+
except RuntimeError:
62+
# We're quite late in the shutdown process and
63+
# the system nursery is already closed.
64+
_core.current_task().parent_nursery.start_soon(
65+
kill_everything, exc
66+
)
67+
6068
return True
6169

6270
# This has to be carefully written to be safe in the face of new items
@@ -102,10 +110,6 @@ def close(self):
102110
def size(self):
103111
return len(self.queue) + len(self.idempotent_queue)
104112

105-
def spawn(self):
106-
name = "<TrioToken.run_sync_soon task>"
107-
_core.spawn_system_task(self.task, name=name)
108-
109113
def run_sync_soon(self, sync_fn, *args, idempotent=False):
110114
with self.lock:
111115
if self.done:

trio/_core/_run.py

Lines changed: 176 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import sys
1010
import threading
1111
from collections import deque
12+
from functools import partial
1213
import collections.abc
1314
from contextlib import contextmanager, closing
1415
import warnings
16+
import weakref
1517
import enum
1618

1719
from contextvars import copy_context
@@ -42,7 +44,7 @@
4244
from ._thread_cache import start_thread_soon
4345
from .. import _core
4446
from .._deprecate import deprecated
45-
from .._util import Final, NoPublicConstructor, coroutine_or_error
47+
from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen
4648

4749
_NO_SEND = object()
4850

@@ -61,8 +63,9 @@ def _public(fn):
6163
_ALLOW_DETERMINISTIC_SCHEDULING = False
6264
_r = random.Random()
6365

64-
# Used to log exceptions in instruments
66+
# Used to log exceptions in instruments and async generator finalizers
6567
INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument")
68+
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")
6669

6770

6871
# On 3.7+, Context.run() is implemented in C and doesn't show up in
@@ -958,7 +961,7 @@ async def async_fn(arg1, arg2, \*, task_status=trio.TASK_STATUS_IGNORED):
958961
self._pending_starts += 1
959962
async with open_nursery() as old_nursery:
960963
task_status = _TaskStatus(old_nursery, self)
961-
thunk = functools.partial(async_fn, task_status=task_status)
964+
thunk = partial(async_fn, task_status=task_status)
962965
task = GLOBAL_RUN_CONTEXT.runner.spawn_impl(
963966
thunk, args, old_nursery, name
964967
)
@@ -1222,6 +1225,14 @@ class Runner:
12221225
is_guest = attr.ib(default=False)
12231226
guest_tick_scheduled = attr.ib(default=False)
12241227

1228+
# Async generators are added to this set when first iterated. Any
1229+
# left after the main task exits will be closed before trio.run()
1230+
# returns. During the execution of the main task, this is a
1231+
# WeakSet so GC works. During shutdown, it's a regular set so we
1232+
# don't have to deal with GC firing at unexpected times.
1233+
asyncgens = attr.ib(factory=weakref.WeakSet)
1234+
prev_asyncgen_hooks = attr.ib(default=None)
1235+
12251236
def force_guest_tick_asap(self):
12261237
if self.guest_tick_scheduled:
12271238
return
@@ -1231,6 +1242,8 @@ def force_guest_tick_asap(self):
12311242
def close(self):
12321243
self.io_manager.close()
12331244
self.entry_queue.close()
1245+
if self.prev_asyncgen_hooks is not None:
1246+
sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks)
12341247
if self.instruments:
12351248
self.instrument("after_run")
12361249
# This is where KI protection gets disabled, so we do it last
@@ -1366,7 +1379,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False):
13661379

13671380
if name is None:
13681381
name = async_fn
1369-
if isinstance(name, functools.partial):
1382+
if isinstance(name, partial):
13701383
name = name.func
13711384
if not isinstance(name, str):
13721385
try:
@@ -1432,11 +1445,7 @@ def task_exited(self, task, outcome):
14321445

14331446
task._activate_cancel_status(None)
14341447
self.tasks.remove(task)
1435-
if task is self.main_task:
1436-
self.main_task_outcome = outcome
1437-
self.system_nursery.cancel_scope.cancel()
1438-
self.system_nursery._child_finished(task, Value(None))
1439-
elif task is self.init_task:
1448+
if task is self.init_task:
14401449
# If the init task crashed, then something is very wrong and we
14411450
# let the error propagate. (It'll eventually be wrapped in a
14421451
# TrioInternalError.)
@@ -1446,11 +1455,120 @@ def task_exited(self, task, outcome):
14461455
if self.tasks: # pragma: no cover
14471456
raise TrioInternalError
14481457
else:
1458+
if task is self.main_task:
1459+
self.main_task_outcome = outcome
1460+
outcome = Value(None)
14491461
task._parent_nursery._child_finished(task, outcome)
14501462

14511463
if self.instruments:
14521464
self.instrument("task_exited", task)
14531465

1466+
################
1467+
# Async generator finalization support
1468+
################
1469+
1470+
async def finalize_asyncgen(self, agen, name, *, check_running):
1471+
if check_running and agen.ag_running:
1472+
# Another async generator is iterating this one, which is
1473+
# suspended at an event loop trap. Add it back to the
1474+
# asyncgens set and we'll get it on the next round. Note
1475+
# that this is only possible during end-of-run
1476+
# finalization; in GC-directed finalization, no one has a
1477+
# reference to agen anymore, so no one can be iterating it.
1478+
#
1479+
# This field is only reliable on 3.8+ due to
1480+
# ttps://bugs.python.org/issue32526. Pythons below
1481+
# 3.8 use a workaround in finalize_remaining_asyncgens.
1482+
self.asyncgens.add(agen)
1483+
return
1484+
1485+
try:
1486+
# This shield ensures that finalize_asyncgen never exits
1487+
# with an exception, not even a Cancelled. The inside
1488+
# is cancelled so there's no deadlock risk.
1489+
with CancelScope(shield=True) as cancel_scope:
1490+
cancel_scope.cancel()
1491+
await agen.aclose()
1492+
except BaseException as exc:
1493+
ASYNCGEN_LOGGER.exception(
1494+
"Exception ignored during finalization of async generator %r -- "
1495+
"surround your use of the generator in 'async with aclosing(...):' "
1496+
"to raise exceptions like this in the context where they're generated",
1497+
name,
1498+
)
1499+
1500+
async def finalize_remaining_asyncgens(self):
1501+
# At the time this function is called, there are exactly two
1502+
# tasks running: init and the run_sync_soon task. (And we've
1503+
# shut down the system nursery, so no more can appear.)
1504+
# Neither one uses async generators, so every async generator
1505+
# must be suspended at a yield point -- there's no one to be
1506+
# doing the iteration. However, once we start aclose() of one
1507+
# async generator, it might start fetching the next value from
1508+
# another, thus preventing us from closing that other.
1509+
#
1510+
# On 3.8+, we can detect this condition by looking at
1511+
# ag_running. On earlier versions, ag_running doesn't provide
1512+
# useful information. We could look at ag_await, but that
1513+
# would fail in case of shenanigans like
1514+
# https://github.com/python-trio/async_generator/pull/16.
1515+
# It's easier to just not parallelize the shutdowns.
1516+
finalize_in_parallel = sys.version_info >= (3, 8)
1517+
1518+
# It's possible that that cleanup code will itself create
1519+
# more async generators, so we iterate repeatedly until
1520+
# all are gone.
1521+
while self.asyncgens:
1522+
batch = self.asyncgens
1523+
self.asyncgens = set()
1524+
1525+
if finalize_in_parallel:
1526+
async with open_nursery() as kill_them_all:
1527+
# This shield is needed to avoid the checkpoint
1528+
# in Nursery.__aexit__ raising Cancelled if we're
1529+
# in a cancelled scope. (Which can happen if
1530+
# a run_sync_soon callback raises an exception.)
1531+
kill_them_all.cancel_scope.shield = True
1532+
for agen in batch:
1533+
name = name_asyncgen(agen)
1534+
kill_them_all.start_soon(
1535+
partial(self.finalize_asyncgen, agen, name, check_running=True),
1536+
name="close asyncgen {} (outlived run)".format(name),
1537+
)
1538+
1539+
if self.asyncgens == batch: # pragma: no cover
1540+
# Something about the running-detection seems
1541+
# to have failed; fall back to one-at-a-time mode
1542+
# instead of looping forever
1543+
finalize_in_parallel = False
1544+
else:
1545+
for agen in batch:
1546+
await self.finalize_asyncgen(agen, name_asyncgen(agen), check_running=False)
1547+
1548+
def setup_asyncgen_hooks(self):
1549+
def firstiter(agen):
1550+
self.asyncgens.add(agen)
1551+
1552+
def finalizer(agen):
1553+
agen_name = name_asyncgen(agen)
1554+
warnings.warn(
1555+
f"Async generator {agen_name!r} was garbage collected before it had "
1556+
f"been exhausted. Surround its use in 'async with aclosing(...):' "
1557+
f"to ensure that it gets cleaned up as soon as you're done using it.",
1558+
ResourceWarning,
1559+
stacklevel=2,
1560+
)
1561+
self.entry_queue.run_sync_soon(
1562+
partial(
1563+
self.spawn_system_task,
1564+
partial(self.finalize_asyncgen, agen, agen_name, check_running=False),
1565+
name=f"close asyncgen {agen_name} (abandoned)",
1566+
),
1567+
)
1568+
1569+
self.prev_asyncgen_hooks = sys.get_asyncgen_hooks()
1570+
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)
1571+
14541572
################
14551573
# System tasks and init
14561574
################
@@ -1500,14 +1618,51 @@ def spawn_system_task(self, async_fn, *args, name=None):
15001618
)
15011619

15021620
async def init(self, async_fn, args):
1503-
async with open_nursery() as system_nursery:
1504-
self.system_nursery = system_nursery
1505-
try:
1506-
self.main_task = self.spawn_impl(async_fn, args, system_nursery, None)
1507-
except BaseException as exc:
1508-
self.main_task_outcome = Error(exc)
1509-
system_nursery.cancel_scope.cancel()
1510-
self.entry_queue.spawn()
1621+
# run_sync_soon task runs here:
1622+
async with open_nursery() as run_sync_soon_nursery:
1623+
# All other system tasks run here:
1624+
async with open_nursery() as self.system_nursery:
1625+
# Only the main task runs here:
1626+
async with open_nursery() as main_task_nursery:
1627+
try:
1628+
self.main_task = self.spawn_impl(
1629+
async_fn, args, main_task_nursery, None
1630+
)
1631+
except BaseException as exc:
1632+
self.main_task_outcome = Error(exc)
1633+
return
1634+
self.spawn_impl(
1635+
self.entry_queue.task,
1636+
(),
1637+
run_sync_soon_nursery,
1638+
"<TrioToken.run_sync_soon task>",
1639+
system_task=True,
1640+
)
1641+
1642+
# Main task is done. We should be exiting soon, so
1643+
# we're going to shut down GC-mediated async generator
1644+
# finalization by turning the asyncgens WeakSet into a
1645+
# regular set. We must do that before closing the system
1646+
# nursery, since finalization spawns a new system tasks.
1647+
self.asyncgens = set(self.asyncgens)
1648+
1649+
# Process all pending run_sync_soon callbacks, in case one of
1650+
# them was an asyncgen finalizer.
1651+
self.entry_queue.run_sync_soon(self.reschedule, self.init_task)
1652+
await wait_task_rescheduled(lambda _: Abort.FAILED)
1653+
1654+
# Now it's safe to proceed with shutting down system tasks
1655+
self.system_nursery.cancel_scope.cancel()
1656+
1657+
# System tasks are gone and no more will be appearing.
1658+
# The only async-colored user code left to run is the
1659+
# finalizers for the async generators that remain alive.
1660+
await self.finalize_remaining_asyncgens()
1661+
1662+
# There are no more asyncgens, which means no more user-provided
1663+
# code except possibly run_sync_soon callbacks. It's finally safe
1664+
# to stop the run_sync_soon task and exit run().
1665+
run_sync_soon_nursery.cancel_scope.cancel()
15111666

15121667
################
15131668
# Outside context problems
@@ -1989,6 +2144,10 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
19892144
if not host_uses_signal_set_wakeup_fd:
19902145
runner.entry_queue.wakeup.wakeup_on_signals()
19912146

2147+
# Do this before before_run in case before_run wants to override
2148+
# our hooks
2149+
runner.setup_asyncgen_hooks()
2150+
19922151
if runner.instruments:
19932152
runner.instrument("before_run")
19942153
runner.clock.start_clock()

0 commit comments

Comments
 (0)