Skip to content

Test suite explodes if a nursery's __aexit__ doesn't get called #82

@pipermerriam

Description

@pipermerriam

What is wrong

I am manually managing the lifecycle of a nursery in a class that exposes it's own async context manager API (this might be a bad idea but I think it's extraneous to the error I've hit).

I had a bug in my __aenter__ implementation which resulted in the method erroring out after having already started the nursery. This results in the Nursery.__aexit__ never being called which I believe is why the error happens.

Here's a minimal example that demonstrates the problem:

import pytest
import trio


class BadWrapper:
    def __init__(self):
        self._trigger_stop = trio.Event()

    async def start(self, nursery):
        raise Exception("Something fails!")
        nursery.start_soon(self.run)

    async def run(self):
        # long running
        await self._trigger_stop.wait()

    async def stop(self):
        self._trigger_stop.set()

    async def __aenter__(self):
        nursery_manager = trio.open_nursery()
        nursery = await nursery_manager.__aenter__()
        await self.start(nursery)
        self._nursery_manager = nursery_manager

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        try:
            await self.stop()
        finally:
            await self._nursery_manager.__aexit__(exc_type, exc_value, exc_tb)


async def do_failure():
    async with BadWrapper():
        pass


@pytest.mark.trio
async def test_trio_internal_failure():
    await do_failure()


if __name__ == '__main__':
    trio.run(do_failure)

If I run this directly from the shell using $ python test_example.py the output is clearly from my exception:

$ python tests/test_trio_internal_failure.py
Traceback (most recent call last):
  File "tests/test_trio_internal_failure.py", line 44, in <module>
    trio.run(do_failure)
  File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 1444, in run
    raise runner.main_task_outcome.error
  File "tests/test_trio_internal_failure.py", line 34, in do_failure
    async with BadWrapper():
  File "tests/test_trio_internal_failure.py", line 23, in __aenter__
    await self.start(nursery)
  File "tests/test_trio_internal_failure.py", line 10, in start
    raise Exception("Something fails!")
Exception: Something fails!

However, run through pytest this is what happens:

$ pytest tests/test_trio_internal_failure.py
=========================================================================================== test session starts ===========================================================================================
platform linux -- Python 3.6.5, pytest-4.0.2, py-1.8.0, pluggy-0.9.0 -- /home/piper/python-environments/lahja/bin/python
cachedir: .pytest_cache
rootdir: /home/piper/projects/lahja, inifile: pytest.ini
plugins: xdist-1.25.0, trio-0.5.2, forked-1.0.2, asyncio-0.9.0
collected 1 item

tests/test_trio_internal_failure.py::test_trio_internal_failure PASSED                                                                                                                              [100%]

======================================================================================== slowest 10 test durations ========================================================================================

(0.00 durations hidden.  Use -vv to show these durations.)
======================================================================================== 1 passed in 0.01 seconds =========================================================================================
(lahja) zazoo ~/projects/lahja piper/mirrored-trio-endpoint X
$ pytest tests/test_trio_internal_failure.py
=========================================================================================== test session starts ===========================================================================================
platform linux -- Python 3.6.5, pytest-4.0.2, py-1.8.0, pluggy-0.9.0 -- /home/piper/python-environments/lahja/bin/python
cachedir: .pytest_cache
rootdir: /home/piper/projects/lahja, inifile: pytest.ini
plugins: xdist-1.25.0, trio-0.5.2, forked-1.0.2, asyncio-0.9.0
collected 1 item

tests/test_trio_internal_failure.py::test_trio_internal_failure FAILED                                                                                                                              [100%]

================================================================================================ FAILURES =================================================================================================
_______________________________________________________________________________________ test_trio_internal_failure ________________________________________________________________________________________

async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), clock = SystemClock(offset=80277.06512981786), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()

    def run(
        async_fn,
        *args,
        clock=None,
        instruments=(),
        restrict_keyboard_interrupt_to_checkpoints=False
    ):
        """Run a trio-flavored async function, and return the result.

        Calling::

           run(async_fn, *args)

        is the equivalent of::

           await async_fn(*args)

        except that :func:`run` can (and must) be called from a synchronous
        context.

        This is trio's main entry point. Almost every other function in trio
        requires that you be inside a call to :func:`run`.

        Args:
          async_fn: An async function.

          args: Positional arguments to be passed to *async_fn*. If you need to
              pass keyword arguments, then use :func:`functools.partial`.

          clock: ``None`` to use the default system-specific monotonic clock;
              otherwise, an object implementing the :class:`trio.abc.Clock`
              interface, like (for example) a :class:`trio.testing.MockClock`
              instance.

          instruments (list of :class:`trio.abc.Instrument` objects): Any
              instrumentation you want to apply to this run. This can also be
              modified during the run; see :ref:`instrumentation`.

          restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
              user hits control-C while :func:`run` is running? If this argument
              is False (the default), then you get the standard Python behavior: a
              :exc:`KeyboardInterrupt` exception will immediately interrupt
              whatever task is running (or if no task is running, then trio will
              wake up a task to be interrupted). Alternatively, if you set this
              argument to True, then :exc:`KeyboardInterrupt` delivery will be
              delayed: it will be *only* be raised at :ref:`checkpoints
              <checkpoints>`, like a :exc:`Cancelled` exception.

              The default behavior is nice because it means that even if you
              accidentally write an infinite loop that never executes any
              checkpoints, then you can still break out of it using control-C.
              The alternative behavior is nice if you're paranoid about a
              :exc:`KeyboardInterrupt` at just the wrong place leaving your
              program in an inconsistent state, because it means that you only
              have to worry about :exc:`KeyboardInterrupt` at the exact same
              places where you already have to worry about :exc:`Cancelled`.

              This setting has no effect if your program has registered a custom
              SIGINT handler, or if :func:`run` is called from anywhere but the
              main thread (this is a Python limitation), or if you use
              :func:`open_signal_receiver` to catch SIGINT.

        Returns:
          Whatever ``async_fn`` returns.

        Raises:
          TrioInternalError: if an unexpected error is encountered inside trio's
              internal machinery. This is a bug and you should `let us know
              <https://github.com/python-trio/trio/issues>`__.

          Anything else: if ``async_fn`` raises an exception, then :func:`run`
              propagates it.

        """

        __tracebackhide__ = True

        # Do error-checking up front, before we enter the TrioInternalError
        # try/catch
        #
        # It wouldn't be *hard* to support nested calls to run(), but I can't
        # think of a single good reason for it, so let's be conservative for
        # now:
        if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
            raise RuntimeError("Attempted to call run() from inside a run()")

        if clock is None:
            clock = SystemClock()
        instruments = list(instruments)
        io_manager = TheIOManager()
        system_context = copy_context()
        system_context.run(current_async_library_cvar.set, "trio")
        runner = Runner(
            clock=clock,
            instruments=instruments,
            io_manager=io_manager,
            system_context=system_context,
        )
        GLOBAL_RUN_CONTEXT.runner = runner
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True

        # KI handling goes outside the core try/except/finally to avoid a window
        # where KeyboardInterrupt would be allowed and converted into an
        # TrioInternalError:
        try:
            with ki_manager(
                runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
            ):
                try:
                    with closing(runner):
                        # The main reason this is split off into its own function
                        # is just to get rid of this extra indentation.
>                       run_impl(runner, async_fn, args)

__tracebackhide__ = True
args       = ()
async_fn   = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
clock      = SystemClock(offset=80277.06512981786)
instruments = []
io_manager = EpollIOManager(_epoll=<select.epoll object at 0x7f44297128b8>, _registered={10: EpollWaiters(read_task=<Task '<TrioToken.run_sync_soon task>' at 0x7f4427403470>, write_task=None)})
restrict_keyboard_interrupt_to_checkpoints = False
runner     = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
system_context = <contextvars.Context object at 0x7f44274032b0>

../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1430:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

runner = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), args = ()

    def run_impl(runner, async_fn, args):
        __tracebackhide__ = True

        if runner.instruments:
            runner.instrument("before_run")
        runner.clock.start_clock()
        runner.init_task = runner.spawn_impl(
            runner.init,
            (async_fn, args),
            None,
            "<init>",
            system_task=True,
        )

        # You know how people talk about "event loops"? This 'while' loop right
        # here is our event loop:
        while runner.tasks:
            if runner.runq:
                timeout = 0
            elif runner.deadlines:
                deadline, _ = runner.deadlines.keys()[0]
                timeout = runner.clock.deadline_to_sleep_time(deadline)
            else:
                timeout = _MAX_TIMEOUT
            timeout = min(max(0, timeout), _MAX_TIMEOUT)

            idle_primed = False
            if runner.waiting_for_idle:
                cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0]
                if cushion < timeout:
                    timeout = cushion
                    idle_primed = True

            if runner.instruments:
                runner.instrument("before_io_wait", timeout)

            runner.io_manager.handle_io(timeout)

            if runner.instruments:
                runner.instrument("after_io_wait", timeout)

            # Process cancellations due to deadline expiry
            now = runner.clock.current_time()
            while runner.deadlines:
                (deadline, _), cancel_scope = runner.deadlines.peekitem(0)
                if deadline <= now:
                    # This removes the given scope from runner.deadlines:
                    cancel_scope.cancel()
                    idle_primed = False
                else:
                    break

            if not runner.runq and idle_primed:
                while runner.waiting_for_idle:
                    key, task = runner.waiting_for_idle.peekitem(0)
                    if key[:2] == (cushion, tiebreaker):
                        del runner.waiting_for_idle[key]
                        runner.reschedule(task)
                    else:
                        break

            # Process all runnable tasks, but only the ones that are already
            # runnable now. Anything that becomes runnable during this cycle needs
            # to wait until the next pass. This avoids various starvation issues
            # by ensuring that there's never an unbounded delay between successive
            # checks for I/O.
            #
            # Also, we randomize the order of each batch to avoid assumptions
            # about scheduling order sneaking in. In the long run, I suspect we'll
            # either (a) use strict FIFO ordering and document that for
            # predictability/determinism, or (b) implement a more sophisticated
            # scheduler (e.g. some variant of fair queueing), for better behavior
            # under load. For now, this is the worst of both worlds - but it keeps
            # our options open. (If we do decide to go all in on deterministic
            # scheduling, then there are other things that will probably need to
            # change too, like the deadlines tie-breaker and the non-deterministic
            # ordering of task._notify_queues.)
            batch = list(runner.runq)
            if _ALLOW_DETERMINISTIC_SCHEDULING:
                # We're running under Hypothesis, and pytest-trio has patched this
                # in to make the scheduler deterministic and avoid flaky tests.
                # It's not worth the (small) performance cost in normal operation,
                # since we'll shuffle the list and _r is only seeded for tests.
                batch.sort(key=lambda t: t._counter)
            runner.runq.clear()
            _r.shuffle(batch)
            while batch:
                task = batch.pop()
                GLOBAL_RUN_CONTEXT.task = task

                if runner.instruments:
                    runner.instrument("before_task_step", task)

                next_send = task._next_send
                task._next_send = None
                final_outcome = None
                try:
                    # We used to unwrap the Outcome object here and send/throw its
                    # contents in directly, but it turns out that .throw() is
                    # buggy, at least on CPython 3.6 and earlier:
                    #   https://bugs.python.org/issue29587
                    #   https://bugs.python.org/issue29590
                    # So now we send in the Outcome object and unwrap it on the
                    # other side.
                    msg = task.context.run(task.coro.send, next_send)
                except StopIteration as stop_iteration:
                    final_outcome = Value(stop_iteration.value)
                except BaseException as task_exc:
                    # Store for later, removing uninteresting top frames: 1 frame
                    # we always remove, because it's this function catching it,
                    # and then in addition we remove however many more Context.run
                    # adds.
                    tb = task_exc.__traceback__.tb_next
                    for _ in range(CONTEXT_RUN_TB_FRAMES):
                        tb = tb.tb_next
                    final_outcome = Error(task_exc.with_traceback(tb))

                if final_outcome is not None:
                    # We can't call this directly inside the except: blocks above,
                    # because then the exceptions end up attaching themselves to
                    # other exceptions as __context__ in unwanted ways.
>                   runner.task_exited(task, final_outcome)

__tracebackhide__ = True
args       = ()
async_fn   = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
batch      = []
final_outcome = Value(None)
idle_primed = False
msg        = WaitTaskRescheduled(abort_func=<function ParkingLot.park.<locals>.abort_fn at 0x7f442742a730>)
next_send  = Error(Cancelled())
now        = 202206.66995797885
runner     = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task       = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>
timeout    = 0

../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1579:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>, outcome = Value(None)

    def task_exited(self, task, outcome):
        while task._cancel_stack:
>           task._cancel_stack[-1]._remove_task(task)

outcome    = Value(None)
self       = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
task       = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>

../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1050:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <trio.CancelScope at 0x7f4427430ce0, bound to 1 task, cancelled>, task = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>

    def _remove_task(self, task):
>       self._tasks.remove(task)
E       KeyError: <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>

self       = <trio.CancelScope at 0x7f4427430ce0, bound to 1 task, cancelled>
task       = <Task "<test 'test_trio_internal_failure'>" at 0x7f44274037b8>

../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:347: KeyError

The above exception was the direct cause of the following exception:

async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>), clock = SystemClock(offset=80277.06512981786), instruments = []
restrict_keyboard_interrupt_to_checkpoints = False, args = ()

    def run(
        async_fn,
        *args,
        clock=None,
        instruments=(),
        restrict_keyboard_interrupt_to_checkpoints=False
    ):
        """Run a trio-flavored async function, and return the result.

        Calling::

           run(async_fn, *args)

        is the equivalent of::

           await async_fn(*args)

        except that :func:`run` can (and must) be called from a synchronous
        context.

        This is trio's main entry point. Almost every other function in trio
        requires that you be inside a call to :func:`run`.

        Args:
          async_fn: An async function.

          args: Positional arguments to be passed to *async_fn*. If you need to
              pass keyword arguments, then use :func:`functools.partial`.

          clock: ``None`` to use the default system-specific monotonic clock;
              otherwise, an object implementing the :class:`trio.abc.Clock`
              interface, like (for example) a :class:`trio.testing.MockClock`
              instance.

          instruments (list of :class:`trio.abc.Instrument` objects): Any
              instrumentation you want to apply to this run. This can also be
              modified during the run; see :ref:`instrumentation`.

          restrict_keyboard_interrupt_to_checkpoints (bool): What happens if the
              user hits control-C while :func:`run` is running? If this argument
              is False (the default), then you get the standard Python behavior: a
              :exc:`KeyboardInterrupt` exception will immediately interrupt
              whatever task is running (or if no task is running, then trio will
              wake up a task to be interrupted). Alternatively, if you set this
              argument to True, then :exc:`KeyboardInterrupt` delivery will be
              delayed: it will be *only* be raised at :ref:`checkpoints
              <checkpoints>`, like a :exc:`Cancelled` exception.

              The default behavior is nice because it means that even if you
              accidentally write an infinite loop that never executes any
              checkpoints, then you can still break out of it using control-C.
              The alternative behavior is nice if you're paranoid about a
              :exc:`KeyboardInterrupt` at just the wrong place leaving your
              program in an inconsistent state, because it means that you only
              have to worry about :exc:`KeyboardInterrupt` at the exact same
              places where you already have to worry about :exc:`Cancelled`.

              This setting has no effect if your program has registered a custom
              SIGINT handler, or if :func:`run` is called from anywhere but the
              main thread (this is a Python limitation), or if you use
              :func:`open_signal_receiver` to catch SIGINT.

        Returns:
          Whatever ``async_fn`` returns.

        Raises:
          TrioInternalError: if an unexpected error is encountered inside trio's
              internal machinery. This is a bug and you should `let us know
              <https://github.com/python-trio/trio/issues>`__.

          Anything else: if ``async_fn`` raises an exception, then :func:`run`
              propagates it.

        """

        __tracebackhide__ = True

        # Do error-checking up front, before we enter the TrioInternalError
        # try/catch
        #
        # It wouldn't be *hard* to support nested calls to run(), but I can't
        # think of a single good reason for it, so let's be conservative for
        # now:
        if hasattr(GLOBAL_RUN_CONTEXT, "runner"):
            raise RuntimeError("Attempted to call run() from inside a run()")

        if clock is None:
            clock = SystemClock()
        instruments = list(instruments)
        io_manager = TheIOManager()
        system_context = copy_context()
        system_context.run(current_async_library_cvar.set, "trio")
        runner = Runner(
            clock=clock,
            instruments=instruments,
            io_manager=io_manager,
            system_context=system_context,
        )
        GLOBAL_RUN_CONTEXT.runner = runner
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True

        # KI handling goes outside the core try/except/finally to avoid a window
        # where KeyboardInterrupt would be allowed and converted into an
        # TrioInternalError:
        try:
            with ki_manager(
                runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints
            ):
                try:
                    with closing(runner):
                        # The main reason this is split off into its own function
                        # is just to get rid of this extra indentation.
                        run_impl(runner, async_fn, args)
                except TrioInternalError:
                    raise
                except BaseException as exc:
                    raise TrioInternalError(
                        "internal error in trio - please file a bug!"
>                   ) from exc
E                   trio.TrioInternalError: internal error in trio - please file a bug!

__tracebackhide__ = True
args       = ()
async_fn   = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7f4427417e18>)
clock      = SystemClock(offset=80277.06512981786)
instruments = []
io_manager = EpollIOManager(_epoll=<select.epoll object at 0x7f44297128b8>, _registered={10: EpollWaiters(read_task=<Task '<TrioToken.run_sync_soon task>' at 0x7f4427403470>, write_task=None)})
restrict_keyboard_interrupt_to_checkpoints = False
runner     = Runner(clock=SystemClock(offset=80277.06512981786), instruments=[], io_manager=EpollIOManager(_epoll=<select.epoll obj...d.RLock object owner=0 count=0 at 0x7f4427436420>), trio_token=None, ki_pending=False, waiting_for_idle=SortedDict({}))
system_context = <contextvars.Context object at 0x7f44274032b0>

../../python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py:1436: TrioInternalError
======================================================================================== slowest 10 test durations ========================================================================================

(0.00 durations hidden.  Use -vv to show these durations.)
======================================================================================== 1 failed in 0.08 seconds =========================================================================================
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f4427403908>>
Traceback (most recent call last):
  File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f44274035f8>>
Traceback (most recent call last):
  File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:
Exception ignored in: <bound method Nursery.__del__ of <trio._core._run.Nursery object at 0x7f4427403e80>>
Traceback (most recent call last):
  File "/home/piper/python-environments/lahja/lib/python3.6/site-packages/trio/_core/_run.py", line 638, in __del__
AssertionError:

Not clear what needs to be done to fix this or whether it is something that should be fixed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions