From dc5691f6389a069db10a28ff739cfe96e101de97 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 16:50:41 -0700 Subject: [PATCH 1/7] Change how we locate the parent greenlet + perf improvements --- README.rst | 27 ++- docs/source/index.rst | 10 ++ docs/source/principle.rst | 22 +-- greenback/_impl.py | 330 ++++++++++++++++++++++------------ greenback/_tests/test_impl.py | 50 +++++- newsfragments/22.bugfix.rst | 4 + newsfragments/26.bugfix.rst | 7 + newsfragments/26.feature.rst | 7 + 8 files changed, 318 insertions(+), 139 deletions(-) create mode 100644 newsfragments/22.bugfix.rst create mode 100644 newsfragments/26.bugfix.rst create mode 100644 newsfragments/26.feature.rst diff --git a/README.rst b/README.rst index 53da61b..ce911a6 100644 --- a/README.rst +++ b/README.rst @@ -78,6 +78,15 @@ Quickstart * Later, use ``greenback.await_(foo())`` as a replacement for ``await foo()`` in places where you can't write ``await``. +* If all of the places where you want to use + ``greenback.await_()`` are indirectly within a single function, you can + eschew the ``await greenback.ensure_portal()`` and instead write a wrapper + around calls to that function: ``await greenback.with_portal_run(...)`` + for an async function, or ``await greenback.with_portal_run_sync(...)`` + for a synchronous function. These have the advantage of cleaning up the + portal (and its associated minor performance impact) as soon as the + function returns, rather than leaving it open until the task terminates. + * For more details and additional helper methods, see the `documentation `__. @@ -141,25 +150,25 @@ FAQ something. **How does it work?** After you run ``await greenback.ensure_portal()`` -in a certain task, each step of that task will run inside a greenlet. +in a certain task, that task will run inside a greenlet. (This is achieved by interposing a "shim" coroutine in between the event loop and the coroutine for your task; see the source code for details.) Calls to ``greenback.await_()`` are then able to switch from that greenlet back to the parent greenlet, which can easily perform the necessary ``await`` since it has direct access to the async environment. The -per-task-step greenlet is then resumed with the value or exception +task greenlet is then resumed with the value or exception produced by the ``await``. **Should I trust this in production?** Maybe; try it and see. The -technique is in some ways an awful hack, and has some performance -implications (any task in which you call ``await -greenback.ensure_portal()`` will run somewhat slower), but we're in +technique is rather low-level, and has some minor +`performance implications `__ (any task in which you call ``await +greenback.ensure_portal()`` will run a bit slower), but we're in good company: SQLAlchemy's async ORM support is implemented in much the same way. ``greenback`` itself is a fairly small amount of -pure-Python code on top of ``greenlet``. (There is one reasonably -safe ctypes hack that is necessary to work around a knob that's not -exposed by the asyncio acceleration extension module on CPython.) -``greenlet`` is a C module full of arcane platform-specific hacks, but +pure-Python code on top of ``greenlet``. (There is one small usage of +``ctypes`` to work around a knob that's not exposed by the asyncio +acceleration extension module on CPython.) +``greenlet`` is a C module full of platform-specific arcana, but it's been around for a very long time and popular production-quality concurrency systems such as ``gevent`` rely heavily on it. diff --git a/docs/source/index.rst b/docs/source/index.rst index 2b63b18..a659f9a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -61,6 +61,16 @@ Quickstart * Later, use ``greenback.await_(foo())`` as a replacement for ``await foo()`` in places where you can't write ``await``. +* If all of the places where you want to use + ``greenback.await_()`` are indirectly within a single function, you can + eschew the ``await greenback.ensure_portal()`` and instead write a wrapper + around calls to that function: ``await greenback.with_portal_run(...)`` + for an async function, or ``await greenback.with_portal_run_sync(...)`` + for a synchronous function. These have the advantage of cleaning up the + portal (and its associated minor :ref:`performance impact `) + as soon as the function returns, rather than leaving it open until the task + terminates. + * For more details and additional helpers, read the rest of this documentation! Detailed documentation diff --git a/docs/source/principle.rst b/docs/source/principle.rst index fe8c52c..7fa3a98 100644 --- a/docs/source/principle.rst +++ b/docs/source/principle.rst @@ -187,30 +187,30 @@ The slowdown due to `greenback` is mostly proportional to the number of times you yield to the event loop with a portal active, as well as the number of portal creations and :func:`await_` calls you perform. You can run the ``microbenchmark.py`` script from the Git repository -to see the numbers on your machine. On a 2020 MacBook Pro (x86), with -CPython 3.9, greenlet 1.1.2, and Trio 0.19.0, I get: +to see the numbers on your machine. On a 2023 MacBook Pro (ARM64), with +CPython 3.12, greenlet 3.0.3, and Trio 0.24.0, I get: * Baseline: The simplest possible async operation is what Trio calls a *checkpoint*: yield to the event loop and ask to immediately be - rescheduled again. This takes about **31.5 microseconds** on Trio and - **28 microseconds** on asyncio. (asyncio is able to take advantage + rescheduled again. This takes about **13.6 microseconds** on Trio and + **12.9 microseconds** on asyncio. (asyncio is able to take advantage of some C acceleration here.) * Adding the greenback portal, without making any :func:`await_` calls - yet, adds about **4 microseconds** per checkpoint. + yet, adds about **1 microsecond** per checkpoint. * Executing each of those checkpoints through a separate - :func:`await_` adds another **10 microseconds** per :func:`await_` on - Trio, or **8 microseconds** on asyncio. (Surrounding - the entire checkpoint loop in a single :func:`await_`, by contrast, - has negligible impact.) + :func:`await_` adds about another **2 microseconds** per :func:`await_`. + (Surrounding the entire checkpoint loop in a single :func:`await_`, by + contrast, has negligible impact.) * Creating a new portal for each of those ``await_(checkpoint())`` - invocations adds another **12 microseconds** or so per portal + invocations adds another **16 microseconds** or so per portal creation. If you don't execute any checkpoints while the portal is active, you can create and destroy it in more like **5 microseconds**. If you use :func:`with_portal_run_sync`, portal - creation gets about **3 microseconds** faster. + creation gets about **10 microseconds** faster (so the portal is only + adding about 6 microseconds of overhead). Keep in mind that these are microbenchmarks: your actual program is probably not executing checkpoints in a tight loop! The more work diff --git a/greenback/_impl.py b/greenback/_impl.py index 6fcf260..0be171d 100644 --- a/greenback/_impl.py +++ b/greenback/_impl.py @@ -13,7 +13,7 @@ Coroutine, Dict, Generator, - MutableSet, + MutableMapping, Optional, TypeVar, Union, @@ -35,11 +35,19 @@ T = TypeVar("T") -# Set of tasks (trio.lowlevel.Task or asyncio.Task) that have a "greenback -# portal" installed, via any of the *_portal() functions. When running, -# these tasks can send event loop traps to greenlet.getcurrent().parent -# in order to yield them to the event loop. -task_has_portal: MutableSet[object] = weakref.WeakSet() +# Dictionary whose keys are tasks (trio.lowlevel.Task or asyncio.Task) +# that have a "greenback portal" installed, via any of the *_portal() +# functions, and whose values are the corresponding greenlets that implement +# the portal. When running, these tasks can send event loop traps to their +# portal greenlet in order to yield them to the event loop. +# +# Immediately after a task has been portalized, but before its first tick +# runs, its value in this mapping will be None, because we don't yet know +# which greenlet is running its greenback_shim coroutine. That's fine because +# we can't reach an await_ in the new task until its first tick runs. +task_portals: MutableMapping[ + object, Optional[greenlet.greenlet] +] = weakref.WeakKeyDictionary() # The offset of asyncio.Task._coro in the Task object memory layout, if # asyncio.Task is implemented in C (which it generally is on CPython 3.6+). @@ -57,19 +65,64 @@ ) -async def greenback_shim(orig_coro: Coroutine[Any, Any, Any]) -> Any: +def trampoline( + portal_greenlet: greenlet.greenlet, + orig_coro: Coroutine[Any, Any, Any], + next_send: outcome.Outcome[Any], +) -> Any: + """Smooths over the interface differences between an event loop trap + encountered during an await_() and one encountered during a native await. + Run this function as the target of a greenlet; it will send (event loop + trap, greenlet to resume) tuples to the `portal_greenlet` and resume the + `orig_coro` with the outcomes sent back in reply. + """ + this_greenlet = greenlet.getcurrent() + while True: + # StopIteration or other exceptions will escape from this function + next_yield = next_send.send(orig_coro) + next_send = portal_greenlet.switch((next_yield, this_greenlet)) + + +@types.coroutine +def async_yield_ready(): + return (yield "ready") + + +async def greenback_shim(task: object, orig_coro: Coroutine[Any, Any, Any]) -> Any: """When a task has called ensure_portal(), its coroutine object is a coroutine for this function. This function then invokes each step of the task's original coroutine in a context that allows suspending via greenlet. """ - # This wrapper ensures that the top-level task coroutine is actually a coroutine, - # not a generator. Some Trio introspection tools care about the difference, as - # does anyio. - return await _greenback_shim(orig_coro) # type: ignore + + # This wrapper serves two purposes: + # + # - It ensures that the top-level task coroutine is actually a coroutine, + # not a generator. Some Trio introspection tools care about the + # difference, as does anyio. + # + # - It yields a sentinel value as its first action, so that it is then + # ready to be resumed with something meaningful. This resolves an + # impedance mismatch: the first send into a new coroutine object must + # be None, but the next send into an already-running task might not + # want to be None. + # + # Portalizing an already-running task (bestow_portal(), ensure_portal()) + # uses this wrapper. Creating a portal around a new function + # (with_portal_run(), with_portal_run_tree()) uses the inner + # _greenback_shim directly. + + next_send = await outcome.acapture(async_yield_ready) + task_portals[task] = greenlet.getcurrent() + try: + return await _greenback_shim(orig_coro, next_send) # type: ignore + finally: + del task_portals[task] @types.coroutine -def _greenback_shim(orig_coro: Coroutine[Any, Any, Any]) -> Generator[Any, Any, Any]: +def _greenback_shim( + orig_coro: Coroutine[Any, Any, Any], next_send: outcome.Outcome[Any] +) -> Generator[Any, Any, Any]: # In theory this could be written as a simpler function that uses # _greenback_shim_sync(): # @@ -88,77 +141,80 @@ def _greenback_shim(orig_coro: Coroutine[Any, Any, Any]) -> Generator[Any, Any, # has a hard time raising StopIteration, because it's a generator, # and unrolling it into a non-generator iterable makes it slower. # So we'll accept a bit of code duplication. - parent_greenlet: greenlet.greenlet - # The greenlet in which each send() or throw() call will occur. - child_greenlet: Optional[greenlet.greenlet] = None + # The greenlet running this _greenback_shim function, which implements + # the portal. We receive tuples (event loop trap, greenlet to resume + # with result of yielding this trap) and pass them through our parent + # event loop / coroutine runner. + portal_greenlet = greenlet.getcurrent() + + # The greenlet running orig_coro, which uses the portal. We wrap it + # in a trampoline() helper function, defined above, in order to simplify + # the control flow. This way we only have to create one greenlet per + # portal, instead of one per event loop trap that uses the portal; this + # improves efficiency. + child_greenlet = greenlet.greenlet( + partial(trampoline, portal_greenlet, orig_coro) + ) + + # The greenlet to which we will send the next thing our event loop + # sends us. This is initially the child_greenlet, but if the child_greenlet + # starts its own nested child greenlets and those want to use await_(), + # they'll be able to. Distinguishing resume_greenlet from child_greenlet + # is important for interoperability with other greenback-like systems, + # such as sqlalchemy's async ORM support. + resume_greenlet = child_greenlet # The contextvars.Context that we have most recently seen as active # for this task and propagated to child_greenlet curr_ctx: Optional[contextvars.Context] = None - # The next thing we plan to yield to the event loop. (The first yield - # goes to ensure_portal() rather than to the event loop, so we use a - # string that is unlikely to be a valid event loop trap.) - next_yield: Any = "ready" - - # The next thing we plan to send to the original coroutine. This is an - # outcome representing the value or error that the event loop resumed - # us with. - next_send: outcome.Outcome[Any] while True: + if ( + greenlet_needs_context_fixup + and portal_greenlet.gr_context is not curr_ctx + and child_greenlet.gr_context is curr_ctx + ): + # Make sure the child greenlet's contextvars context + # is the same as our own, even if our own context + # changes (such as via trio.Task.context assignment), + # unless the child greenlet appears to have changed + # its context privately through a call to Context.run(). + # + # We only fix up our immediate child. If it spawns its own + # grandchild greenlet(s), it's responsible for propagating + # contextvars to those. + # + # Note 'portal_greenlet.gr_context' here is just a + # portable way of getting the current contextvars + # context, which is not exposed by the contextvars + # module directly (copy_context() returns a copy, not + # a new reference to the original). Upon initial + # creation of child_greenlet, curr_ctx and + # child_greenlet.gr_context will both be None, so this + # condition works for that case too. + child_greenlet.gr_context = curr_ctx = portal_greenlet.gr_context + + try: + # Forward the event loop's resumption message into our child. + # It will proceed until the next event loop trap and send us + # that trap + the identity of the greenlet that reached the trap; + # the latter is so we can resume the correct greenlet next time + # in case of nested child greenlets. + next_yield, resume_greenlet = resume_greenlet.switch(next_send) + except StopIteration as ex: + # The underlying coroutine completed, so we forward its return value. + return ex.value + # If the underlying coroutine raises any other exception, it will + # propagate out of _greenback_shim, which is what we want. + try: # Normally we send to orig_coro whatever the event loop sent us next_send = outcome.Value((yield next_yield)) except BaseException as ex: # If the event loop resumed us with an error, we forward that error next_send = outcome.Error(ex) - try: - if not child_greenlet: - # It is important that we delay the parent_greenlet fetch until - # we actually create the child_greenlet. Otherwise, we will inherit - # whatever greenlet is active when bestow_portal() is called, which - # messes up the context fixup below. - parent_greenlet = greenlet.getcurrent() - # Start a new send() or throw() call on the original coroutine. - child_greenlet = greenlet.greenlet(next_send.send) - switch_arg: Any = orig_coro - else: - # Resume the previous send() or throw() call, which is currently - # at a simulated yield point in a greenback.await_() call. - switch_arg = next_send - - if ( - greenlet_needs_context_fixup - and parent_greenlet.gr_context is not curr_ctx - and child_greenlet.gr_context is curr_ctx - ): - # Make sure the child greenlet's contextvars context - # is the same as our own, even if our own context - # changes (such as via trio.Task.context assignment), - # unless the child greenlet appears to have changed - # its context privately through a call to Context.run(). - # - # Note 'parent_greenlet.gr_context' here is just a - # portable way of getting the current contextvars - # context, which is not exposed by the contextvars - # module directly (copy_context() returns a copy, not - # a new reference to the original). Upon initial - # creation of child_greenlet, curr_ctx and - # child_greenlet.gr_context will both be None, so this - # condition works for that case too. - child_greenlet.gr_context = curr_ctx = parent_greenlet.gr_context - - next_yield = child_greenlet.switch(switch_arg) - if child_greenlet.dead: - # The send() or throw() call completed so we need to - # create a new greenlet for the next one. - child_greenlet = curr_ctx = None - except StopIteration as ex: - # The underlying coroutine completed, so we forward its return value. - return ex.value - # If the underlying coroutine terminated with an exception, it will - # propagate out of greenback_shim, which is what we want. + @types.coroutine @@ -166,20 +222,24 @@ def _greenback_shim_sync(target: Callable[[], Any]) -> Generator[Any, Any, Any]: """Run target(), forwarding the event loop traps and responses necessary to implement any await_() calls that it makes. - This is only a little bit faster than using greenback_shim() plus a - sync-to-async wrapper -- maybe 2us faster for the entire call, - so it only matters when you're scoping the portal to a very small - range. We ship it anyway because it's easier to understand than + This gives a nice speed boost over using greenback_shim() plus a + sync-to-async wrapper (6 microseconds to create a sync portal versus + 16 for async, on the author's machine), though that's probably only + relevant when you're scoping the portal to a very small range. + We ship it anyway because it's easier to understand than the async-compatible _greenback_shim(), and helps with understanding the latter. """ - parent_greenlet = greenlet.getcurrent() + portal_greenlet = greenlet.getcurrent() curr_ctx = None # The greenlet in which we run target(). child_greenlet = greenlet.greenlet(target) + # The greenlet currently suspended in await_() + resume_greenlet = child_greenlet + # The next thing we plan to yield to the event loop. next_yield: Any @@ -191,7 +251,7 @@ def _greenback_shim_sync(target: Callable[[], Any]) -> Generator[Any, Any, Any]: while True: if ( greenlet_needs_context_fixup - and parent_greenlet.gr_context is not curr_ctx + and portal_greenlet.gr_context is not curr_ctx and child_greenlet.gr_context is curr_ctx ): # Make sure the child greenlet's contextvars context @@ -200,7 +260,11 @@ def _greenback_shim_sync(target: Callable[[], Any]) -> Generator[Any, Any, Any]: # unless the child greenlet appears to have changed # its context privately through a call to Context.run(). # - # Note 'parent_greenlet.gr_context' here is just a + # We only fix up our immediate child. If it spawns its own + # grandchild greenlet(s), it's responsible for propagating + # contextvars to those. + # + # Note 'portal_greenlet.gr_context' here is just a # portable way of getting the current contextvars # context, which is not exposed by the contextvars # module directly (copy_context() returns a copy, not @@ -208,20 +272,29 @@ def _greenback_shim_sync(target: Callable[[], Any]) -> Generator[Any, Any, Any]: # creation of child_greenlet, curr_ctx and # child_greenlet.gr_context will both be None, so this # condition works for that case too. - child_greenlet.gr_context = curr_ctx = parent_greenlet.gr_context + child_greenlet.gr_context = curr_ctx = portal_greenlet.gr_context + # Forward the event loop's resumption message into our child. + # It will proceed until the next event loop trap and send us + # that trap + the identity of the greenlet that reached the trap; + # the latter is so we can resume the correct greenlet next time + # in case of nested child greenlets. if next_send is None: - next_yield = child_greenlet.switch() + request = resume_greenlet.switch() else: - next_yield = child_greenlet.switch(next_send) + request = resume_greenlet.switch(next_send) + if child_greenlet.dead: - # target() returned, so next_yield is its return value, not an - # event loop trap. (If it exits with an exception, that exception - # will propagate out of switch() and thus out of the loop, which - # is what we want.) - return next_yield + # target() returned, so `request` is its return value, rather than + # a (next_yield, resume_greenlet) tuple. (If target() exits with an + # exception, that exception will propagate out of switch() and thus + # out of the loop, which is what we want.) + return request + + next_yield, resume_greenlet = request + try: - # Normally we send to orig_coro whatever the event loop sent us + # Normally we send target() whatever the event loop sent us next_send = outcome.Value((yield next_yield)) except BaseException as ex: # If the event loop resumed us with an error, we forward that error @@ -328,7 +401,7 @@ def bestow_portal(task: Union["trio.lowlevel.Task", "asyncio.Task[Any]"]) -> Non control to the event loop. """ - if task in task_has_portal: + if task in task_portals: # This task already has a greenback shim; nothing to do. return @@ -341,13 +414,13 @@ def bestow_portal(task: Union["trio.lowlevel.Task", "asyncio.Task[Any]"]) -> Non from trio.hazmat import Task assert isinstance(task, Task) - shim_coro = greenback_shim(task.coro) + shim_coro = greenback_shim(task, task.coro) commit: Callable[[], None] = partial(setattr, task, "coro", shim_coro) else: import asyncio assert isinstance(task, asyncio.Task) - shim_coro = greenback_shim(get_aio_task_coro(task)) + shim_coro = greenback_shim(task, get_aio_task_coro(task)) commit = partial(set_aio_task_coro, task, shim_coro) # Step it once so it's ready to get resumed by the event loop @@ -358,9 +431,10 @@ def bestow_portal(task: Union["trio.lowlevel.Task", "asyncio.Task[Any]"]) -> Non # original task coroutine commit() - # Enable greenback.await_() in this task, since all of its future steps - # will run under the greenback shim - task_has_portal.add(task) + # Note that this task has been portalized so we don't try to do it again. + # Its parent greenlet (the value in this mapping) will be set on its + # next tick, enabling greenback.await_() for this task. + task_portals[task] = None async def ensure_portal() -> None: @@ -388,7 +462,7 @@ async def ensure_portal() -> None: """ this_task = current_task() - if this_task not in task_has_portal: + if this_task not in task_portals: bestow_portal(this_task) # Execute a checkpoint so that we're now running inside the shim coroutine. @@ -405,12 +479,30 @@ def has_portal( :func:`greenback.await_`, false otherwise. If no *task* is specified, query the currently executing task. """ + if task is not None and task_portals.get(task) is not None: + return True + + try: + this_task = current_task() + except (RuntimeError, sniffio.AsyncLibraryNotFoundError): + this_task = None + if task is None: - try: - task = current_task() - except (RuntimeError, sniffio.AsyncLibraryNotFoundError): + task = this_task + if task is None: return False - return task in task_has_portal + + if task is this_task: + # For the currently running task, an entry in task_portals + # with value None means a portal has been bestowed but won't + # be active until the next checkpoint. The answer to "can I run + # await_()" in that case is therefore "no". + return task_portals.get(task) is not None + else: + # For a task that's not currently running, even an inactive portal + # will be activated before the task can do anything, so we return + # True here. + return task in task_portals async def with_portal_run( @@ -437,16 +529,16 @@ async def with_portal_run( """ this_task = current_task() - if this_task in task_has_portal: + if this_task in task_portals: return await async_fn(*args, **kwds) - shim_coro = _greenback_shim(async_fn(*args, **kwds)) # type: ignore - assert shim_coro.send(None) == "ready" - task_has_portal.add(this_task) + task_portals[this_task] = greenlet.getcurrent() try: - res: T = await shim_coro + res: T = await _greenback_shim( + async_fn(*args, **kwds), outcome.Value(None) + ) # type: ignore return res finally: - task_has_portal.remove(this_task) + del task_portals[this_task] async def with_portal_run_sync(sync_fn: Callable[..., T], *args: Any, **kwds: Any) -> T: @@ -468,14 +560,14 @@ async def with_portal_run_sync(sync_fn: Callable[..., T], *args: Any, **kwds: An """ this_task = current_task() - if this_task in task_has_portal: + if this_task in task_portals: return sync_fn(*args, **kwds) - task_has_portal.add(this_task) + task_portals[this_task] = greenlet.getcurrent() try: res: T = await _greenback_shim_sync(partial(sync_fn, *args, **kwds)) return res finally: - task_has_portal.remove(this_task) + del task_portals[this_task] if TYPE_CHECKING: @@ -570,7 +662,7 @@ async def with_portal_run_tree( elif this_task in instrument.tasks: # We're already inside another call to with_portal_run_tree(), so nothing # more needs to be done - assert this_task in task_has_portal + assert this_task in task_portals return await async_fn(*args, **kwds) # Store our current nursery depth. This allows the instrument to @@ -607,11 +699,22 @@ def await_(aw: Awaitable[T]) -> T: """ try: task = current_task() - if task not in task_has_portal: - raise RuntimeError( - "you must 'await greenback.ensure_portal()' in this task first" - ) from None - gr = greenlet.getcurrent().parent + portal_greenlet = task_portals.get(task, None) + if portal_greenlet is None: + if task in task_portals: + library = sniffio.current_async_library() + raise RuntimeError( + f"You must yield to the event loop (try 'await {library}." + "sleep(0)') after calling 'greenback.bestow_portal()' on " + "the current task before using the portal" + ) + else: + raise RuntimeError( + "You must create a greenback portal for this task in order " + "to use 'greenback.await_()'. Try 'await " + "greenback.ensure_portal()' or " + "'await greenback.with_portal_run(some_fn, *args)'." + ) except BaseException: if isinstance(aw, collections.abc.Coroutine): # Suppress the "coroutine was never awaited" warning @@ -628,6 +731,7 @@ def await_(aw: Awaitable[T]) -> T: # Step through the coroutine until it's exhausted, sending each trap # into the portal for the event loop to process. + this_greenlet = greenlet.getcurrent() next_send: outcome.Outcome[Any] = outcome.Value(None) while True: try: @@ -658,4 +762,4 @@ def await_(aw: Awaitable[T]) -> T: # next_send is an outcome.Outcome representing the value or error # with which the event loop wants to resume the task - next_send = gr.switch(next_yield) + next_send = portal_greenlet.switch((next_yield, this_greenlet)) diff --git a/greenback/_tests/test_impl.py b/greenback/_tests/test_impl.py index 6cf7bc7..28e49b1 100644 --- a/greenback/_tests/test_impl.py +++ b/greenback/_tests/test_impl.py @@ -24,9 +24,7 @@ async def test_simple(library): async def one_task(*, have_portal=False): if not have_portal: assert not has_portal() - with pytest.raises( - RuntimeError, match="you must 'await greenback.ensure_portal" - ): + with pytest.raises(RuntimeError, match="create a greenback portal"): await_(anyio.sleep(0)) await ensure_portal() await ensure_portal() @@ -141,6 +139,8 @@ async def task_fn(): task = greenback._impl.current_task() task_started.set() await portal_installed.wait() + assert has_portal(task) + assert has_portal() await_(anyio.sleep(0)) async with anyio.create_task_group() as tg: @@ -152,9 +152,21 @@ async def task_fn(): greenback.bestow_portal(task) portal_installed.set() - with pytest.raises(RuntimeError): + with pytest.raises(RuntimeError, match="must create a greenback portal"): await_(anyio.sleep(0)) + # bestow_portal() on self doesn't work until the next checkpoint: + greenback.bestow_portal(greenback._impl.current_task()) + assert not has_portal() + assert not has_portal(greenback._impl.current_task()) + with pytest.raises(RuntimeError, match="must yield to the event loop"): + await_(anyio.sleep(0)) + # after a checkpoint, it works + await anyio.sleep(0) + assert has_portal() + assert has_portal(greenback._impl.current_task()) + await_(anyio.sleep(0)) + async def test_no_context_leakage(): # Regression test for issue 17 @@ -260,7 +272,7 @@ def test_misuse(): @trio.run async def wrong_library(): - sniffio.current_async_library_cvar.set("tokio") + sniffio.thread_local.name = "tokio" with pytest.raises(RuntimeError, match="greenback does not support tokio"): greenback.await_(trio.sleep(1)) @@ -320,7 +332,7 @@ async def test_portal_map_does_not_leak(library): for _ in range(4): gc.collect() - assert not greenback._impl.task_has_portal + assert not greenback._impl.task_portals async def test_awaitable(library): @@ -409,3 +421,29 @@ def close(self): # One frame for the call here, one frame where await_ calls send(), # one where outcome.send() discovers it can't call our send() assert len(info.traceback) == 3 + + +async def test_double_greenlet(library): + # Make sure await_ works even if you run it inside some other greenlet. + # Regression test for https://github.com/oremanj/greenback/issues/22 + + async def inner_fn(middle_gr): + for i in range(10): + await anyio.sleep(0) + assert f"resume {i + 1}" == middle_gr.switch(i) + return "success" + + def middle_fn(): + inner_gr = greenlet.greenlet(greenback.await_) + assert 0 == inner_gr.switch(inner_fn(greenlet.getcurrent())) + for i in range(1, 10): + assert i == inner_gr.switch(f"resume {i}") + assert not inner_gr.dead + assert "success" == inner_gr.switch("resume 10") + assert inner_gr.dead + + async def middle_fn_async(): + middle_fn() + + await greenback.with_portal_run(middle_fn_async) + await greenback.with_portal_run_sync(middle_fn) diff --git a/newsfragments/22.bugfix.rst b/newsfragments/22.bugfix.rst new file mode 100644 index 0000000..5bffe1c --- /dev/null +++ b/newsfragments/22.bugfix.rst @@ -0,0 +1,4 @@ +greenback now properly handles cases where a task spawns another greenlet +(not managed by greenback) that in turn calls :func:`greenback.await_`. +This improves interoperability with other greenback-like systems that do not +use the greenback library, such as SQLAlchemy's async ORM support. diff --git a/newsfragments/26.bugfix.rst b/newsfragments/26.bugfix.rst new file mode 100644 index 0000000..e0ae7f0 --- /dev/null +++ b/newsfragments/26.bugfix.rst @@ -0,0 +1,7 @@ +:func:`greenback.has_portal` now returns False if run in a task that has called +:func:`greenback.bestow_portal` on itself but has not yet made the portal +usable by executing a checkpoint. This reflects the fact that +:func:`greenback.await_` in such a task will fail. The exception message for +such an :func:`~greenback.await_` failure has also been updated to more +precisely describe the problem, rather than the previous generic "you must +create a portal first". diff --git a/newsfragments/26.feature.rst b/newsfragments/26.feature.rst new file mode 100644 index 0000000..031f6d0 --- /dev/null +++ b/newsfragments/26.feature.rst @@ -0,0 +1,7 @@ +greenback's internals have been reorganized to improve the performance of +executing ordinary checkpoints (``await`` statements, approximately) in +a task that has a greenback portal active. On the author's laptop with +CPython 3.12, the overhead is only about one microsecond compared to the +performance without greenback involved, versus four microseconds before +this change. For comparison, the non-greenback cost of executing a +checkpoint is 12-13 microseconds. From c6d4f5026c48277983f75b1194f65bd334ad60d6 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 16:58:39 -0700 Subject: [PATCH 2/7] blacken + doc fix --- docs/source/principle.rst | 4 +--- greenback/_impl.py | 11 ++++------- greenback/_util.py | 6 ++---- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/docs/source/principle.rst b/docs/source/principle.rst index 7fa3a98..c06ae42 100644 --- a/docs/source/principle.rst +++ b/docs/source/principle.rst @@ -206,9 +206,7 @@ CPython 3.12, greenlet 3.0.3, and Trio 0.24.0, I get: * Creating a new portal for each of those ``await_(checkpoint())`` invocations adds another **16 microseconds** or so per portal - creation. If you don't execute any checkpoints while the portal is - active, you can create and destroy it in more like **5 - microseconds**. If you use :func:`with_portal_run_sync`, portal + creation. If you use :func:`with_portal_run_sync`, portal creation gets about **10 microseconds** faster (so the portal is only adding about 6 microseconds of overhead). diff --git a/greenback/_impl.py b/greenback/_impl.py index 0be171d..c5bb2ff 100644 --- a/greenback/_impl.py +++ b/greenback/_impl.py @@ -45,9 +45,9 @@ # runs, its value in this mapping will be None, because we don't yet know # which greenlet is running its greenback_shim coroutine. That's fine because # we can't reach an await_ in the new task until its first tick runs. -task_portals: MutableMapping[ - object, Optional[greenlet.greenlet] -] = weakref.WeakKeyDictionary() +task_portals: MutableMapping[object, Optional[greenlet.greenlet]] = ( + weakref.WeakKeyDictionary() +) # The offset of asyncio.Task._coro in the Task object memory layout, if # asyncio.Task is implemented in C (which it generally is on CPython 3.6+). @@ -153,9 +153,7 @@ def _greenback_shim( # the control flow. This way we only have to create one greenlet per # portal, instead of one per event loop trap that uses the portal; this # improves efficiency. - child_greenlet = greenlet.greenlet( - partial(trampoline, portal_greenlet, orig_coro) - ) + child_greenlet = greenlet.greenlet(partial(trampoline, portal_greenlet, orig_coro)) # The greenlet to which we will send the next thing our event loop # sends us. This is initially the child_greenlet, but if the child_greenlet @@ -216,7 +214,6 @@ def _greenback_shim( next_send = outcome.Error(ex) - @types.coroutine def _greenback_shim_sync(target: Callable[[], Any]) -> Generator[Any, Any, Any]: """Run target(), forwarding the event loop traps and responses necessary diff --git a/greenback/_util.py b/greenback/_util.py index 7e2ca9e..46d37cf 100644 --- a/greenback/_util.py +++ b/greenback/_util.py @@ -38,8 +38,7 @@ def wrapper(*args: Any, **kw: Any) -> T: # For signature-preserving decorators we can declare the result as # signature-preserving too, and catch the case where the inner function isn't async @overload -def decorate_as_sync(decorator: Callable[[F], F]) -> Callable[[AF], AF]: - ... +def decorate_as_sync(decorator: Callable[[F], F]) -> Callable[[AF], AF]: ... # For non-signature-preserving, all we can do is say the inner function and @@ -48,8 +47,7 @@ def decorate_as_sync(decorator: Callable[[F], F]) -> Callable[[AF], AF]: @overload def decorate_as_sync( decorator: Callable[..., Any] -) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: - ... +) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: ... def decorate_as_sync(decorator: Any) -> Any: From c4b9b55eee82a53741f23184431b94fffb73424c Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 17:00:31 -0700 Subject: [PATCH 3/7] Use newer Trio, avoid sniffio thread-local leakage from test --- greenback/_tests/test_impl.py | 9 ++++++--- test-requirements.txt | 7 +++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/greenback/_tests/test_impl.py b/greenback/_tests/test_impl.py index 28e49b1..61ddd9e 100644 --- a/greenback/_tests/test_impl.py +++ b/greenback/_tests/test_impl.py @@ -272,9 +272,12 @@ def test_misuse(): @trio.run async def wrong_library(): - sniffio.thread_local.name = "tokio" - with pytest.raises(RuntimeError, match="greenback does not support tokio"): - greenback.await_(trio.sleep(1)) + old_name, sniffio.thread_local.name = sniffio.thread_local.name, "tokio" + try: + with pytest.raises(RuntimeError, match="greenback does not support tokio"): + greenback.await_(trio.sleep(1)) + finally: + sniffio.thread_local.name = old_name @trio.run async def not_awaitable(): diff --git a/test-requirements.txt b/test-requirements.txt index 6b4466f..6933856 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,12 +3,11 @@ pytest pytest-cov pytest-trio -# We run tests on both asyncio and Trio. anyio 4.0 (not yet released) and trio 0.22.0 must be used together. -anyio < 4.0 -trio < 0.22.0 +# We run tests on both asyncio and Trio. anyio 4.0+ (not yet released) and trio 0.22.0+ must be used together. +anyio >= 4.0 +trio >= 0.23.0 # Tools black >= 19.10b0; implementation_name == "cpython" mypy >= 0.750; implementation_name == "cpython" flake8 -trio-typing >= 0.5.0 From 22948a43db216a5f620c6d50bb00504e9b40c0f3 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 17:01:51 -0700 Subject: [PATCH 4/7] Drop async_generator dependency --- greenback/_tests/test_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenback/_tests/test_util.py b/greenback/_tests/test_util.py index 89fa042..24ebcc0 100644 --- a/greenback/_tests/test_util.py +++ b/greenback/_tests/test_util.py @@ -3,7 +3,7 @@ import pytest import sys import trio -from async_generator import asynccontextmanager +from contextlib import asynccontextmanager from .._impl import ensure_portal, has_portal from .._util import autoawait, async_context, async_iter, decorate_as_sync From 7ed8c0cc0d6b64bbfb490347eada40d6883ec170 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 17:06:34 -0700 Subject: [PATCH 5/7] Drop 3.6-3.7 support, as our dependencies now require 3.8+ --- .github/workflows/ci.yml | 16 ++++++++-------- .travis.yml | 22 ---------------------- README.rst | 2 +- docs/source/index.rst | 2 +- setup.py | 2 +- 5 files changed, 11 insertions(+), 33 deletions(-) delete mode 100644 .travis.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a65a1bd..816d0ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,12 +14,12 @@ jobs: strategy: fail-fast: false matrix: - python: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] arch: ['x86', 'x64'] extra_name: [''] old_greenlet: ['0'] include: - - python: '3.7' + - python: '3.8' arch: 'x86' old_greenlet: '1' extra_name: ', older greenlet package' @@ -58,7 +58,7 @@ jobs: strategy: fail-fast: false matrix: - python: ['pypy-3.8', 'pypy-3.9', '3.7', '3.8', '3.9', '3.10', '3.11', '3.10-dev', '3.11-dev'] + python: ['pypy-3.9', '3.8', '3.9', '3.10', '3.11', '3.12'] check_lint: ['0'] extra_name: [''] old_greenlet: ['0'] @@ -66,15 +66,15 @@ jobs: - python: '3.9' check_lint: '1' extra_name: ', formatting and linting' - - python: '3.7' + - python: '3.8' old_greenlet: '1' extra_name: ', older greenlet package' - python: '3.9' # NB: old greenlet not supported on >=3.10 old_greenlet: '1' extra_name: ', older greenlet package' - - python: '3.9' # <- not actually used - pypy_nightly_branch: 'py3.9' - extra_name: ', pypy 3.9 nightly' + - python: '3.10' # <- not actually used + pypy_nightly_branch: 'py3.10' + extra_name: ', pypy 3.10 nightly' steps: - name: Checkout uses: actions/checkout@v2 @@ -104,7 +104,7 @@ jobs: strategy: fail-fast: false matrix: - python: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - name: Checkout uses: actions/checkout@v2 diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 6d109a8..0000000 --- a/.travis.yml +++ /dev/null @@ -1,22 +0,0 @@ -language: python -dist: bionic - -matrix: - include: - # The pypy tests are slow, so we list them first - - python: pypy3.6-7.2.0 - - language: generic - env: PYPY_NIGHTLY_BRANCH=py3.6 - - python: 3.6-dev - - python: 3.7-dev - - python: 3.8-dev - - python: 3.9-dev - - python: nightly - -script: - - ./ci.sh - -branches: - only: - - master - diff --git a/README.rst b/README.rst index ce911a6..faa2211 100644 --- a/README.rst +++ b/README.rst @@ -63,7 +63,7 @@ below. This is potentially useful in a number of different situations: * You can (cautiously) design async APIs that block in places where you can't write ``await``, such as on attribute accesses. -``greenback`` requires Python 3.6 or later and an implementation that +``greenback`` requires Python 3.8 or later and an implementation that supports the ``greenlet`` library. Either CPython or PyPy should work. There are no known OS dependencies. diff --git a/docs/source/index.rst b/docs/source/index.rst index a659f9a..3dfb697 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -45,7 +45,7 @@ below. This is potentially useful in a number of different situations: * You can (cautiously) design async APIs that block in places where you can't write ``await``, such as on attribute accesses. -``greenback`` requires Python 3.6 or later and an implementation that +``greenback`` requires Python 3.8 or later and an implementation that supports the ``greenlet`` library. Either CPython or PyPy should work. There are no known OS dependencies. diff --git a/setup.py b/setup.py index 9fcf2c8..e9581a1 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ include_package_data=True, install_requires=["greenlet != 0.4.17", "sniffio", "outcome"], keywords=["async", "io", "trio", "asyncio"], - python_requires=">=3.6", + python_requires=">=3.8", classifiers=[ "License :: OSI Approved :: MIT License", "License :: OSI Approved :: Apache Software License", From e958502f1e6562388baeda189be7e7fe91271496 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 17:11:14 -0700 Subject: [PATCH 6/7] mypy fixes --- .github/workflows/ci.yml | 2 +- greenback/_impl.py | 13 +++++++------ greenback/_util.py | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 816d0ec..56ba29e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,7 +63,7 @@ jobs: extra_name: [''] old_greenlet: ['0'] include: - - python: '3.9' + - python: '3.12' check_lint: '1' extra_name: ', formatting and linting' - python: '3.8' diff --git a/greenback/_impl.py b/greenback/_impl.py index c5bb2ff..406434a 100644 --- a/greenback/_impl.py +++ b/greenback/_impl.py @@ -79,12 +79,12 @@ def trampoline( this_greenlet = greenlet.getcurrent() while True: # StopIteration or other exceptions will escape from this function - next_yield = next_send.send(orig_coro) + next_yield: Any = next_send.send(orig_coro) # type: ignore next_send = portal_greenlet.switch((next_yield, this_greenlet)) @types.coroutine -def async_yield_ready(): +def async_yield_ready() -> Generator[Any, Any, Any]: return (yield "ready") @@ -325,7 +325,7 @@ def current_task() -> Union["trio.lowlevel.Task", "asyncio.Task[Any]"]: def get_aio_task_coro(task: "asyncio.Task[Any]") -> Coroutine[Any, Any, Any]: try: # Public API in 3.8+ - return task.get_coro() # type: ignore # (defined as returning Any) + return task.get_coro() # (defined as returning Any) except AttributeError: return task._coro # type: ignore # (not in typeshed) @@ -530,9 +530,10 @@ async def with_portal_run( return await async_fn(*args, **kwds) task_portals[this_task] = greenlet.getcurrent() try: - res: T = await _greenback_shim( - async_fn(*args, **kwds), outcome.Value(None) - ) # type: ignore + coro = async_fn(*args, **kwds) + if not isinstance(coro, collections.abc.Coroutine): + coro = adapt_awaitable(coro) + res: T = await _greenback_shim(coro, outcome.Value(None)) return res finally: del task_portals[this_task] diff --git a/greenback/_util.py b/greenback/_util.py index 46d37cf..7bf9a74 100644 --- a/greenback/_util.py +++ b/greenback/_util.py @@ -122,7 +122,7 @@ def __enter__(self) -> T: f"{type(self._cm).__name__!r} object does not support the " "asynchronous context manager protocol (missed __aexit__ method)" ) from None - return await_(aenter(self._cm)) + return await_(aenter(self._cm)) # type: ignore else: From e7134b7790cff0a6f3831cfb8d4fd0b295d50190 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 7 Feb 2024 17:17:23 -0700 Subject: [PATCH 7/7] Coverage fixes --- greenback/_impl.py | 12 ++---------- greenback/_tests/test_impl.py | 9 ++++++++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/greenback/_impl.py b/greenback/_impl.py index 406434a..ba0ee3c 100644 --- a/greenback/_impl.py +++ b/greenback/_impl.py @@ -322,14 +322,6 @@ def current_task() -> Union["trio.lowlevel.Task", "asyncio.Task[Any]"]: raise RuntimeError(f"greenback does not support {library}") -def get_aio_task_coro(task: "asyncio.Task[Any]") -> Coroutine[Any, Any, Any]: - try: - # Public API in 3.8+ - return task.get_coro() # (defined as returning Any) - except AttributeError: - return task._coro # type: ignore # (not in typeshed) - - def _aligned_ptr_offset_in_object(obj: object, referent: object) -> Optional[int]: """Return the byte offset in the C representation of *obj* (an arbitrary Python object) at which is found a naturally-aligned @@ -365,7 +357,7 @@ def set_aio_task_coro( global aio_task_coro_c_offset import ctypes - old_coro = get_aio_task_coro(task) + old_coro = task.get_coro() if aio_task_coro_c_offset is None: # Deduce the offset by scanning the task object representation @@ -417,7 +409,7 @@ def bestow_portal(task: Union["trio.lowlevel.Task", "asyncio.Task[Any]"]) -> Non import asyncio assert isinstance(task, asyncio.Task) - shim_coro = greenback_shim(task, get_aio_task_coro(task)) + shim_coro = greenback_shim(task, task.get_coro()) commit = partial(set_aio_task_coro, task, shim_coro) # Step it once so it's ready to get resumed by the event loop diff --git a/greenback/_tests/test_impl.py b/greenback/_tests/test_impl.py index 61ddd9e..584417e 100644 --- a/greenback/_tests/test_impl.py +++ b/greenback/_tests/test_impl.py @@ -70,7 +70,14 @@ async def serve_echo(): # pragma: no cover async def test_with_portal_run(library): - for test in (test_simple, test_complex): + class Awaitable: + def __init__(self, library): + self.library = library + + def __await__(self): + return test_simple(self.library).__await__() + + for test in (test_simple, test_complex, Awaitable): await greenback.with_portal_run(test, library) await greenback.with_portal_run(greenback.with_portal_run, test, library) with pytest.raises(RuntimeError, match="greenback.ensure_portal"):