-
-
Notifications
You must be signed in to change notification settings - Fork 27
Closed
Description
What is wrong
I am manually managing the lifecycle of a nursery in a class that exposes it's own async context manager API (this might be a bad idea but I think it's extraneous to the error I've hit).
I had a bug in my __aenter__
implementation which resulted in the method erroring out after having already started the nursery. This results in the Nursery.__aexit__
never being called which I believe is why the error happens.
Here's a minimal example that demonstrates the problem:
import pytest
import trio
class BadWrapper:
def __init__(self):
self._trigger_stop = trio.Event()
async def start(self, nursery):
raise Exception("Something fails!")
nursery.start_soon(self.run)
async def run(self):
# long running
await self._trigger_stop.wait()
async def stop(self):
self._trigger_stop.set()
async def __aenter__(self):
nursery_manager = trio.open_nursery()
nursery = await nursery_manager.__aenter__()
await self.start(nursery)
self._nursery_manager = nursery_manager
async def __aexit__(self, exc_type, exc_value, exc_tb):
try:
await self.stop()
finally:
await self._nursery_manager.__aexit__(exc_type, exc_value, exc_tb)
async def do_failure():
async with BadWrapper():
pass
@pytest.mark.trio
async def test_trio_internal_failure():
await do_failure()
if __name__ == '__main__':
trio.run(do_failure)
If I run this directly from the shell using $ python test_example.py
the output is clearly from my exception:
$ python tests/test_trio_internal_failure.py
Traceback (most recent call last):
File "tests/test_trio_internal_failure.py", line 44, in <module>
trio.run(do_failure)
File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 1444, in run
raise runner.main_task_outcome.error
File "tests/test_trio_internal_failure.py", line 34, in do_failure
async with BadWrapper():
File "tests/test_trio_internal_failure.py", line 23, in __aenter__
await self.start(nursery)
File "tests/test_trio_internal_failure.py", line 10, in start
raise Exception("Something fails!")
Exception: Something fails!
However, run through pytest this is what happens:
$ pytest tests/test_trio_internal_failure.py
=========================================================================================== test session starts ===========================================================================================
platform linux -- Python 3.6.5, pytest-4.0.2, py-1.8.0, pluggy-0.9.0 -- /home/piper/python-environments/lahja/bin/python
cachedir: .pytest_cache
rootdir: /home/piper/projects/lahja, inifile: pytest.ini
plugins: xdist-1.25.0, trio-0.5.2, forked-1.0.2, asyncio-0.9.0
collected 1 item
tests/test_trio_internal_failure.py::test_trio_internal_failure PASSED [100%]
======================================================================================== slowest 10 test durations ========================================================================================
(0.00 durations hidden. Use -vv to show these durations.)
======================================================================================== 1 passed in 0.01 seconds =========================================================================================
(lahja) zazoo ~/projects/lahja piper/mirrored-trio-endpoint X
$ pytest tests/test_trio_internal_failure.py
=========================================================================================== test session starts ===========================================================================================
platform linux -- Python 3.6.5, pytest-4.0.2, py-1.8.0, pluggy-0.9.0 -- /home/piper/python-environments/lahja/bin/python
cachedir: .pytest_cache
rootdir: /home/piper/projects/lahja, inifile: pytest.ini
plugins: xdist-1.25.0, trio-0.5.2, forked-1.0.2, asyncio-0.9.0
collected 1 item
tests/test_trio_internal_failure.py::test_trio_internal_failure FAILED [100%]
================================================================================================ FAILURES =================================================================================================
_______________________________________________________________________________________ test_trio_internal_failure ________________________________________________________________________________________
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), clock = SystemClock(offset=80277.06512981786), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()
def run(
async_fn,
*args,
clock=None,
instruments=(),
restrict_keyboard_interrupt_to_checkpoints=False
):
"""Run a trio-flavored async function, and return the result.
Calling::
run(async_fn, *args)
is the equivalent of::
await async_fn(*args)
except that :func:`run` can (and must) be called from a synchronous
context.
This is trio's main entry point. Almost every other function in trio
requires that you be inside a call to :func:`run`.
Args:
async_fn: An async function.
args: Positional arguments to be passed to *async_fn*. If you need to
pass keyword arguments, then use :func:`functools.partial`.
clock: ``None`` to use the default system-specific monotonic clock;
otherwise, an object implementing the :class:`trio.abc.Clock`
interface, like (for example) a :class:`trio.testing.MockClock`
instance.
instruments (list of :class:`trio.abc.Instrument` objects): Any
instrumentation you want to apply to this run. This can also be
modified during the run; see :ref:`instrumentation`.
restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
user hits control-C while :func:`run` is running? If this argument
is False (the default), then you get the standard Python behavior: a
:exc:`KeyboardInterrupt` exception will immediately interrupt
whatever task is running (or if no task is running, then trio will
wake up a task to be interrupted). Alternatively, if you set this
argument to True, then :exc:`KeyboardInterrupt` delivery will be
delayed: it will be *only* be raised at :ref:`checkpoints
<checkpoints>`, like a :exc:`Cancelled` exception.
The default behavior is nice because it means that even if you
accidentally write an infinite loop that never executes any
checkpoints, then you can still break out of it using control-C.
The alternative behavior is nice if you're paranoid about a
:exc:`KeyboardInterrupt` at just the wrong place leaving your
program in an inconsistent state, because it means that you only
have to worry about :exc:`KeyboardInterrupt` at the exact same
places where you already have to worry about :exc:`Cancelled`.
This setting has no effect if your program has registered a custom
SIGINT handler, or if :func:`run` is called from anywhere but the
main thread (this is a Python limitation), or if you use
:func:`open_signal_receiver` to catch SIGINT.
Returns:
Whatever ``async_fn`` returns.
Raises:
TrioInternalError: if an unexpected error is encountered inside trio's
internal machinery. This is a bug and you should `let us know
<https://github.com/python-trio/trio/issues>`__.
Anything else: if ``async_fn`` raises an exception, then :func:`run`
propagates it.
"""
__tracebackhide__ = True
# Do error-checking up front, before we enter the TrioInternalError
# try/catch
#
# It wouldn't be *hard* to support nested calls to run(), but I can't
# think of a single good reason for it, so let's be conservative for
# now:
if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
raise RuntimeError("Attempted to call run() from inside a run()")
if clock is None:
clock = SystemClock()
instruments = list(instruments)
io_manager = TheIOManager()
system_context = copy_context()
system_context.run(current_async_library_cvar.set, "trio")
runner = Runner(
clock=clock,
instruments=instruments,
io_manager=io_manager,
system_context=system_context,
)
GLOBAL_RUN_CONTEXT.runner = runner
locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
# KI handling goes outside the core try/except/finally to avoid a window
# where KeyboardInterrupt would be allowed and converted into an
# TrioInternalError:
try:
with ki_manager(
runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
):
try:
with closing(runner):
# The main reason this is split off into its own function
# is just to get rid of this extra indentation.
> run_impl(runner, async_fn, args)
__tracebackhide__ = True
args = ()
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
clock = SystemClock(offset=80277.06512981786)
instruments = []
io_manager = EpollIOManager(_epoll=<select.epoll object at 0x7f44297128b8>, _registered={10: EpollWaiters(read_task=<Task '<TrioToken.run_sync_soon task>' at 0x7f4427403470>, write_task=None)})
restrict_keyboard_interrupt_to_checkpoints = False
runner = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
system_context = <contextvars.Context object at 0x7f44274032b0>
../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1430:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
runner = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), args = ()
def run_impl(runner, async_fn, args):
__tracebackhide__ = True
if runner.instruments:
runner.instrument("before_run")
runner.clock.start_clock()
runner.init_task = runner.spawn_impl(
runner.init,
(async_fn, args),
None,
"<init>",
system_task=True,
)
# You know how people talk about "event loops"? This 'while' loop right
# here is our event loop:
while runner.tasks:
if runner.runq:
timeout = 0
elif runner.deadlines:
deadline, _ = runner.deadlines.keys()[0]
timeout = runner.clock.deadline_to_sleep_time(deadline)
else:
timeout = _MAX_TIMEOUT
timeout = min(max(0, timeout), _MAX_TIMEOUT)
idle_primed = False
if runner.waiting_for_idle:
cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0]
if cushion < timeout:
timeout = cushion
idle_primed = True
if runner.instruments:
runner.instrument("before_io_wait", timeout)
runner.io_manager.handle_io(timeout)
if runner.instruments:
runner.instrument("after_io_wait", timeout)
# Process cancellations due to deadline expiry
now = runner.clock.current_time()
while runner.deadlines:
(deadline, _), cancel_scope = runner.deadlines.peekitem(0)
if deadline <= now:
# This removes the given scope from runner.deadlines:
cancel_scope.cancel()
idle_primed = False
else:
break
if not runner.runq and idle_primed:
while runner.waiting_for_idle:
key, task = runner.waiting_for_idle.peekitem(0)
if key[:2] == (cushion, tiebreaker):
del runner.waiting_for_idle[key]
runner.reschedule(task)
else:
break
# Process all runnable tasks, but only the ones that are already
# runnable now. Anything that becomes runnable during this cycle needs
# to wait until the next pass. This avoids various starvation issues
# by ensuring that there's never an unbounded delay between successive
# checks for I/O.
#
# Also, we randomize the order of each batch to avoid assumptions
# about scheduling order sneaking in. In the long run, I suspect we'll
# either (a) use strict FIFO ordering and document that for
# predictability/determinism, or (b) implement a more sophisticated
# scheduler (e.g. some variant of fair queueing), for better behavior
# under load. For now, this is the worst of both worlds - but it keeps
# our options open. (If we do decide to go all in on deterministic
# scheduling, then there are other things that will probably need to
# change too, like the deadlines tie-breaker and the non-deterministic
# ordering of task._notify_queues.)
batch = list(runner.runq)
if _ALLOW_DETERMINISTIC_SCHEDULING:
# We're running under Hypothesis, and pytest-trio has patched this
# in to make the scheduler deterministic and avoid flaky tests.
# It's not worth the (small) performance cost in normal operation,
# since we'll shuffle the list and _r is only seeded for tests.
batch.sort(key=lambda t: t._counter)
runner.runq.clear()
_r.shuffle(batch)
while batch:
task = batch.pop()
GLOBAL_RUN_CONTEXT.task = task
if runner.instruments:
runner.instrument("before_task_step", task)
next_send = task._next_send
task._next_send = None
final_outcome = None
try:
# We used to unwrap the Outcome object here and send/throw its
# contents in directly, but it turns out that .throw() is
# buggy, at least on CPython 3.6 and earlier:
# https://bugs.python.org/issue29587
# https://bugs.python.org/issue29590
# So now we send in the Outcome object and unwrap it on the
# other side.
msg = task.context.run(task.coro.send, next_send)
except StopIteration as stop_iteration:
final_outcome = Value(stop_iteration.value)
except BaseException as task_exc:
# Store for later, removing uninteresting top frames: 1 frame
# we always remove, because it's this function catching it,
# and then in addition we remove however many more Context.run
# adds.
tb = task_exc.__traceback__.tb_next
for _ in range(CONTEXT_RUN_TB_FRAMES):
tb = tb.tb_next
final_outcome = Error(task_exc.with_traceback(tb))
if final_outcome is not None:
# We can't call this directly inside the except: blocks above,
# because then the exceptions end up attaching themselves to
# other exceptions as __context__ in unwanted ways.
> runner.task_exited(task, final_outcome)
__tracebackhide__ = True
args = ()
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
batch = []
final_outcome = Value(None)
idle_primed = False
msg = WaitTaskRescheduled(abort_func=<function ParkingLot.park.<locals>.abort_fn at 0x7f442742a730>)
next_send = Error(Cancelled())
now = 202206.66995797885
runner = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
timeout = 0
../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1579:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>, outcome = Value(None)
def task_exited(self, task, outcome):
while task._cancel_stack:
> task._cancel_stack[-1]._remove_task(task)
outcome = Value(None)
self = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1050:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <trio.CancelScope at 0x7f4427430ce0, bound to 1 task, cancelled>, task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
def _remove_task(self, task):
> self._tasks.remove(task)
E KeyError: <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
self = <trio.CancelScope at 0x7f4427430ce0, bound to 1 task, cancelled>
task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:347: KeyError
The above exception was the direct cause of the following exception:
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), clock = SystemClock(offset=80277.06512981786), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()
def run(
async_fn,
*args,
clock=None,
instruments=(),
restrict_keyboard_interrupt_to_checkpoints=False
):
"""Run a trio-flavored async function, and return the result.
Calling::
run(async_fn, *args)
is the equivalent of::
await async_fn(*args)
except that :func:`run` can (and must) be called from a synchronous
context.
This is trio's main entry point. Almost every other function in trio
requires that you be inside a call to :func:`run`.
Args:
async_fn: An async function.
args: Positional arguments to be passed to *async_fn*. If you need to
pass keyword arguments, then use :func:`functools.partial`.
clock: ``None`` to use the default system-specific monotonic clock;
otherwise, an object implementing the :class:`trio.abc.Clock`
interface, like (for example) a :class:`trio.testing.MockClock`
instance.
instruments (list of :class:`trio.abc.Instrument` objects): Any
instrumentation you want to apply to this run. This can also be
modified during the run; see :ref:`instrumentation`.
restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
user hits control-C while :func:`run` is running? If this argument
is False (the default), then you get the standard Python behavior: a
:exc:`KeyboardInterrupt` exception will immediately interrupt
whatever task is running (or if no task is running, then trio will
wake up a task to be interrupted). Alternatively, if you set this
argument to True, then :exc:`KeyboardInterrupt` delivery will be
delayed: it will be *only* be raised at :ref:`checkpoints
<checkpoints>`, like a :exc:`Cancelled` exception.
The default behavior is nice because it means that even if you
accidentally write an infinite loop that never executes any
checkpoints, then you can still break out of it using control-C.
The alternative behavior is nice if you're paranoid about a
:exc:`KeyboardInterrupt` at just the wrong place leaving your
program in an inconsistent state, because it means that you only
have to worry about :exc:`KeyboardInterrupt` at the exact same
places where you already have to worry about :exc:`Cancelled`.
This setting has no effect if your program has registered a custom
SIGINT handler, or if :func:`run` is called from anywhere but the
main thread (this is a Python limitation), or if you use
:func:`open_signal_receiver` to catch SIGINT.
Returns:
Whatever ``async_fn`` returns.
Raises:
TrioInternalError: if an unexpected error is encountered inside trio's
internal machinery. This is a bug and you should `let us know
<https://github.com/python-trio/trio/issues>`__.
Anything else: if ``async_fn`` raises an exception, then :func:`run`
propagates it.
"""
__tracebackhide__ = True
# Do error-checking up front, before we enter the TrioInternalError
# try/catch
#
# It wouldn't be *hard* to support nested calls to run(), but I can't
# think of a single good reason for it, so let's be conservative for
# now:
if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
raise RuntimeError("Attempted to call run() from inside a run()")
if clock is None:
clock = SystemClock()
instruments = list(instruments)
io_manager = TheIOManager()
system_context = copy_context()
system_context.run(current_async_library_cvar.set, "trio")
runner = Runner(
clock=clock,
instruments=instruments,
io_manager=io_manager,
system_context=system_context,
)
GLOBAL_RUN_CONTEXT.runner = runner
locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
# KI handling goes outside the core try/except/finally to avoid a window
# where KeyboardInterrupt would be allowed and converted into an
# TrioInternalError:
try:
with ki_manager(
runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
):
try:
with closing(runner):
# The main reason this is split off into its own function
# is just to get rid of this extra indentation.
run_impl(runner, async_fn, args)
except TrioInternalError:
raise
except BaseException as exc:
raise TrioInternalError(
"internal error in trio - please file a bug!"
> ) from exc
E trio.TrioInternalError: internal error in trio - please file a bug!
__tracebackhide__ = True
args = ()
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
clock = SystemClock(offset=80277.06512981786)
instruments = []
io_manager = EpollIOManager(_epoll=<select.epoll object at 0x7f44297128b8>, _registered={10: EpollWaiters(read_task=<Task '<TrioToken.run_sync_soon task>' at 0x7f4427403470>, write_task=None)})
restrict_keyboard_interrupt_to_checkpoints = False
runner = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
system_context = <contextvars.Context object at 0x7f44274032b0>
../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1436: TrioInternalError
======================================================================================== slowest 10 test durations ========================================================================================
(0.00 durations hidden. Use -vv to show these durations.)
======================================================================================== 1 failed in 0.08 seconds =========================================================================================
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f4427403908>>
Traceback (most recent call last):
File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f44274035f8>>
Traceback (most recent call last):
File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f4427403e80>>
Traceback (most recent call last):
File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:
Not clear what needs to be done to fix this or whether it is something that should be fixed.
Metadata
Metadata
Assignees
Labels
No labels