Skip to content

Commit 3268114

Browse files
authored
Merge pull request #1564 from oremanj/asyncgenhooks-basic
Add support for async generator finalization
2 parents 163cfaa + 0441952 commit 3268114

File tree

12 files changed

+844
-20
lines changed

12 files changed

+844
-20
lines changed

docs/source/conf.py

100644100755
File mode changed.

docs/source/reference-core.rst

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,182 @@ don't have any special access to Trio's internals.)
14651465
:members:
14661466

14671467

1468+
.. _async-generators:
1469+
1470+
Notes on async generators
1471+
-------------------------
1472+
1473+
Python 3.6 added support for *async generators*, which can use
1474+
``await``, ``async for``, and ``async with`` in between their ``yield``
1475+
statements. As you might expect, you use ``async for`` to iterate
1476+
over them. :pep:`525` has many more details if you want them.
1477+
1478+
For example, the following is a roundabout way to print
1479+
the numbers 0 through 9 with a 1-second delay before each one::
1480+
1481+
async def range_slowly(*args):
1482+
"""Like range(), but adds a 1-second sleep before each value."""
1483+
for value in range(*args):
1484+
await trio.sleep(1)
1485+
yield value
1486+
1487+
async def use_it():
1488+
async for value in range_slowly(10):
1489+
print(value)
1490+
1491+
trio.run(use_it)
1492+
1493+
Trio supports async generators, with some caveats described in this section.
1494+
1495+
Finalization
1496+
~~~~~~~~~~~~
1497+
1498+
If you iterate over an async generator in its entirety, like the
1499+
example above does, then the execution of the async generator will
1500+
occur completely in the context of the code that's iterating over it,
1501+
and there aren't too many surprises.
1502+
1503+
If you abandon a partially-completed async generator, though, such as
1504+
by ``break``\ing out of the iteration, things aren't so simple. The
1505+
async generator iterator object is still alive, waiting for you to
1506+
resume iterating it so it can produce more values. At some point,
1507+
Python will realize that you've dropped all references to the
1508+
iterator, and will call on Trio to throw in a `GeneratorExit` exception
1509+
so that any remaining cleanup code inside the generator has a chance
1510+
to run: ``finally`` blocks, ``__aexit__`` handlers, and so on.
1511+
1512+
So far, so good. Unfortunately, Python provides no guarantees about
1513+
*when* this happens. It could be as soon as you break out of the
1514+
``async for`` loop, or an arbitrary amount of time later. It could
1515+
even be after the entire Trio run has finished! Just about the only
1516+
guarantee is that it *won't* happen in the task that was using the
1517+
generator. That task will continue on with whatever else it's doing,
1518+
and the async generator cleanup will happen "sometime later,
1519+
somewhere else": potentially with different context variables,
1520+
not subject to timeouts, and/or after any nurseries you're using have
1521+
been closed.
1522+
1523+
If you don't like that ambiguity, and you want to ensure that a
1524+
generator's ``finally`` blocks and ``__aexit__`` handlers execute as
1525+
soon as you're done using it, then you'll need to wrap your use of the
1526+
generator in something like `async_generator.aclosing()
1527+
<https://async-generator.readthedocs.io/en/latest/reference.html#context-managers>`__::
1528+
1529+
# Instead of this:
1530+
async for value in my_generator():
1531+
if value == 42:
1532+
break
1533+
1534+
# Do this:
1535+
async with aclosing(my_generator()) as aiter:
1536+
async for value in aiter:
1537+
if value == 42:
1538+
break
1539+
1540+
This is cumbersome, but Python unfortunately doesn't provide any other
1541+
reliable options. If you use ``aclosing()``, then
1542+
your generator's cleanup code executes in the same context as the
1543+
rest of its iterations, so timeouts, exceptions, and context
1544+
variables work like you'd expect.
1545+
1546+
If you don't use ``aclosing()``, then Trio will do
1547+
its best anyway, but you'll have to contend with the following semantics:
1548+
1549+
* The cleanup of the generator occurs in a cancelled context, i.e.,
1550+
all blocking calls executed during cleanup will raise `Cancelled`.
1551+
This is to compensate for the fact that any timeouts surrounding
1552+
the original use of the generator have been long since forgotten.
1553+
1554+
* The cleanup runs without access to any :ref:`context variables
1555+
<task-local-storage>` that may have been present when the generator
1556+
was originally being used.
1557+
1558+
* If the generator raises an exception during cleanup, then it's
1559+
printed to the ``trio.async_generator_errors`` logger and otherwise
1560+
ignored.
1561+
1562+
* If an async generator is still alive at the end of the whole
1563+
call to :func:`trio.run`, then it will be cleaned up after all
1564+
tasks have exited and before :func:`trio.run` returns.
1565+
Since the "system nursery" has already been closed at this point,
1566+
Trio isn't able to support any new calls to
1567+
:func:`trio.lowlevel.spawn_system_task`.
1568+
1569+
If you plan to run your code on PyPy to take advantage of its better
1570+
performance, you should be aware that PyPy is *far more likely* than
1571+
CPython to perform async generator cleanup at a time well after the
1572+
last use of the generator. (This is a consequence of the fact that
1573+
PyPy does not use reference counting to manage memory.) To help catch
1574+
issues like this, Trio will issue a `ResourceWarning` (ignored by
1575+
default, but enabled when running under ``python -X dev`` for example)
1576+
for each async generator that needs to be handled through the fallback
1577+
finalization path.
1578+
1579+
Cancel scopes and nurseries
1580+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
1581+
1582+
.. warning:: You may not write a ``yield`` statement that suspends an async generator
1583+
inside a `CancelScope` or `Nursery` that was entered within the generator.
1584+
1585+
That is, this is OK::
1586+
1587+
async def some_agen():
1588+
with trio.move_on_after(1):
1589+
await long_operation()
1590+
yield "first"
1591+
async with trio.open_nursery() as nursery:
1592+
nursery.start_soon(task1)
1593+
nursery.start_soon(task2)
1594+
yield "second"
1595+
...
1596+
1597+
But this is not::
1598+
1599+
async def some_agen():
1600+
with trio.move_on_after(1):
1601+
yield "first"
1602+
async with trio.open_nursery() as nursery:
1603+
yield "second"
1604+
...
1605+
1606+
Async generators decorated with ``@asynccontextmanager`` to serve as
1607+
the template for an async context manager are *not* subject to this
1608+
constraint, because ``@asynccontextmanager`` uses them in a limited
1609+
way that doesn't create problems.
1610+
1611+
Violating the rule described in this section will sometimes get you a
1612+
useful error message, but Trio is not able to detect all such cases,
1613+
so sometimes you'll get an unhelpful `TrioInternalError`. (And
1614+
sometimes it will seem to work, which is probably the worst outcome of
1615+
all, since then you might not notice the issue until you perform some
1616+
minor refactoring of the generator or the code that's iterating it, or
1617+
just get unlucky. There is a `proposed Python enhancement
1618+
<https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091>`__
1619+
that would at least make it fail consistently.)
1620+
1621+
The reason for the restriction on cancel scopes has to do with the
1622+
difficulty of noticing when a generator gets suspended and
1623+
resumed. The cancel scopes inside the generator shouldn't affect code
1624+
running outside the generator, but Trio isn't involved in the process
1625+
of exiting and reentering the generator, so it would be hard pressed
1626+
to keep its cancellation plumbing in the correct state. Nurseries
1627+
use a cancel scope internally, so they have all the problems of cancel
1628+
scopes plus a number of problems of their own: for example, when
1629+
the generator is suspended, what should the background tasks do?
1630+
There's no good way to suspend them, but if they keep running and throw
1631+
an exception, where can that exception be reraised?
1632+
1633+
If you have an async generator that wants to ``yield`` from within a nursery
1634+
or cancel scope, your best bet is to refactor it to be a separate task
1635+
that communicates over memory channels.
1636+
1637+
For more discussion and some experimental partial workarounds, see
1638+
Trio issues `264 <https://github.com/python-trio/trio/issues/264>`__
1639+
(especially `this comment
1640+
<https://github.com/python-trio/trio/issues/264#issuecomment-418989328>`__)
1641+
and `638 <https://github.com/python-trio/trio/issues/638>`__.
1642+
1643+
14681644
.. _threads:
14691645

14701646
Threads (if you must)

newsfragments/265.headline.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Trio now supports automatic :ref:`async generator finalization
2+
<async-generators>`, so more async generators will work even if you
3+
don't wrap them in ``async with async_generator.aclosing():``
4+
blocks. Please see the documentation for important caveats; in
5+
particular, yielding within a nursery or cancel scope remains
6+
unsupported.

trio/_core/_asyncgens.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import attr
2+
import logging
3+
import sys
4+
import warnings
5+
import weakref
6+
7+
from .._util import name_asyncgen
8+
from . import _run
9+
from .. import _core
10+
11+
# Used to log exceptions in async generator finalizers
12+
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")
13+
14+
15+
@attr.s(eq=False, slots=True)
16+
class AsyncGenerators:
17+
# Async generators are added to this set when first iterated. Any
18+
# left after the main task exits will be closed before trio.run()
19+
# returns. During most of the run, this is a WeakSet so GC works.
20+
# During shutdown, when we're finalizing all the remaining
21+
# asyncgens after the system nursery has been closed, it's a
22+
# regular set so we don't have to deal with GC firing at
23+
# unexpected times.
24+
alive = attr.ib(factory=weakref.WeakSet)
25+
26+
# This collects async generators that get garbage collected during
27+
# the one-tick window between the system nursery closing and the
28+
# init task starting end-of-run asyncgen finalization.
29+
trailing_needs_finalize = attr.ib(factory=set)
30+
31+
prev_hooks = attr.ib(init=False)
32+
33+
def install_hooks(self, runner):
34+
def firstiter(agen):
35+
if hasattr(_run.GLOBAL_RUN_CONTEXT, "task"):
36+
self.alive.add(agen)
37+
else:
38+
# An async generator first iterated outside of a Trio
39+
# task doesn't belong to Trio. Probably we're in guest
40+
# mode and the async generator belongs to our host.
41+
# The locals dictionary is the only good place to
42+
# remember this fact, at least until
43+
# https://bugs.python.org/issue40916 is implemented.
44+
agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True
45+
if self.prev_hooks.firstiter is not None:
46+
self.prev_hooks.firstiter(agen)
47+
48+
def finalize_in_trio_context(agen, agen_name):
49+
try:
50+
runner.spawn_system_task(
51+
self._finalize_one,
52+
agen,
53+
agen_name,
54+
name=f"close asyncgen {agen_name} (abandoned)",
55+
)
56+
except RuntimeError:
57+
# There is a one-tick window where the system nursery
58+
# is closed but the init task hasn't yet made
59+
# self.asyncgens a strong set to disable GC. We seem to
60+
# have hit it.
61+
self.trailing_needs_finalize.add(agen)
62+
63+
def finalizer(agen):
64+
agen_name = name_asyncgen(agen)
65+
try:
66+
is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen")
67+
except AttributeError: # pragma: no cover
68+
is_ours = True
69+
70+
if is_ours:
71+
runner.entry_queue.run_sync_soon(
72+
finalize_in_trio_context, agen, agen_name
73+
)
74+
75+
# Do this last, because it might raise an exception
76+
# depending on the user's warnings filter. (That
77+
# exception will be printed to the terminal and
78+
# ignored, since we're running in GC context.)
79+
warnings.warn(
80+
f"Async generator {agen_name!r} was garbage collected before it "
81+
f"had been exhausted. Surround its use in 'async with "
82+
f"aclosing(...):' to ensure that it gets cleaned up as soon as "
83+
f"you're done using it.",
84+
ResourceWarning,
85+
stacklevel=2,
86+
source=agen,
87+
)
88+
else:
89+
# Not ours -> forward to the host loop's async generator finalizer
90+
if self.prev_hooks.finalizer is not None:
91+
self.prev_hooks.finalizer(agen)
92+
else:
93+
# Host has no finalizer. Reimplement the default
94+
# Python behavior with no hooks installed: throw in
95+
# GeneratorExit, step once, raise RuntimeError if
96+
# it doesn't exit.
97+
closer = agen.aclose()
98+
try:
99+
# If the next thing is a yield, this will raise RuntimeError
100+
# which we allow to propagate
101+
closer.send(None)
102+
except StopIteration:
103+
pass
104+
else:
105+
# If the next thing is an await, we get here. Give a nicer
106+
# error than the default "async generator ignored GeneratorExit"
107+
raise RuntimeError(
108+
f"Non-Trio async generator {agen_name!r} awaited something "
109+
f"during finalization; install a finalization hook to "
110+
f"support this, or wrap it in 'async with aclosing(...):'"
111+
)
112+
113+
self.prev_hooks = sys.get_asyncgen_hooks()
114+
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)
115+
116+
async def finalize_remaining(self, runner):
117+
# This is called from init after shutting down the system nursery.
118+
# The only tasks running at this point are init and
119+
# the run_sync_soon task, and since the system nursery is closed,
120+
# there's no way for user code to spawn more.
121+
assert _core.current_task() is runner.init_task
122+
assert len(runner.tasks) == 2
123+
124+
# To make async generator finalization easier to reason
125+
# about, we'll shut down asyncgen garbage collection by turning
126+
# the alive WeakSet into a regular set.
127+
self.alive = set(self.alive)
128+
129+
# Process all pending run_sync_soon callbacks, in case one of
130+
# them was an asyncgen finalizer that snuck in under the wire.
131+
runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task)
132+
await _core.wait_task_rescheduled(
133+
lambda _: _core.Abort.FAILED # pragma: no cover
134+
)
135+
self.alive.update(self.trailing_needs_finalize)
136+
self.trailing_needs_finalize.clear()
137+
138+
# None of the still-living tasks use async generators, so
139+
# every async generator must be suspended at a yield point --
140+
# there's no one to be doing the iteration. That's good,
141+
# because aclose() only works on an asyncgen that's suspended
142+
# at a yield point. (If it's suspended at an event loop trap,
143+
# because someone is in the middle of iterating it, then you
144+
# get a RuntimeError on 3.8+, and a nasty surprise on earlier
145+
# versions due to https://bugs.python.org/issue32526.)
146+
#
147+
# However, once we start aclose() of one async generator, it
148+
# might start fetching the next value from another, thus
149+
# preventing us from closing that other (at least until
150+
# aclose() of the first one is complete). This constraint
151+
# effectively requires us to finalize the remaining asyncgens
152+
# in arbitrary order, rather than doing all of them at the
153+
# same time. On 3.8+ we could defer any generator with
154+
# ag_running=True to a later batch, but that only catches
155+
# the case where our aclose() starts after the user's
156+
# asend()/etc. If our aclose() starts first, then the
157+
# user's asend()/etc will raise RuntimeError, since they're
158+
# probably not checking ag_running.
159+
#
160+
# It might be possible to allow some parallelized cleanup if
161+
# we can determine that a certain set of asyncgens have no
162+
# interdependencies, using gc.get_referents() and such.
163+
# But just doing one at a time will typically work well enough
164+
# (since each aclose() executes in a cancelled scope) and
165+
# is much easier to reason about.
166+
167+
# It's possible that that cleanup code will itself create
168+
# more async generators, so we iterate repeatedly until
169+
# all are gone.
170+
while self.alive:
171+
batch = self.alive
172+
self.alive = set()
173+
for agen in batch:
174+
await self._finalize_one(agen, name_asyncgen(agen))
175+
176+
def close(self):
177+
sys.set_asyncgen_hooks(*self.prev_hooks)
178+
179+
async def _finalize_one(self, agen, name):
180+
try:
181+
# This shield ensures that finalize_asyncgen never exits
182+
# with an exception, not even a Cancelled. The inside
183+
# is cancelled so there's no deadlock risk.
184+
with _core.CancelScope(shield=True) as cancel_scope:
185+
cancel_scope.cancel()
186+
await agen.aclose()
187+
except BaseException:
188+
ASYNCGEN_LOGGER.exception(
189+
"Exception ignored during finalization of async generator %r -- "
190+
"surround your use of the generator in 'async with aclosing(...):' "
191+
"to raise exceptions like this in the context where they're generated",
192+
name,
193+
)

0 commit comments

Comments
 (0)