Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-124309: Modernize the staggered_race implementation to support eager task factories #124390

Merged

Conversation

ZeroIntensity
Copy link
Member

@ZeroIntensity ZeroIntensity commented Sep 23, 2024

Lib/asyncio/staggered.py Outdated Show resolved Hide resolved
Lib/asyncio/staggered.py Outdated Show resolved Hide resolved
Copy link
Contributor

@graingert graingert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup

Lib/asyncio/staggered.py Outdated Show resolved Hide resolved
Lib/asyncio/staggered.py Outdated Show resolved Hide resolved
@ZeroIntensity
Copy link
Member Author

I think I'm just going to refactor this to not use TaskGroup. It's causing more problems than solutions.

@graingert
Copy link
Contributor

I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes

@ZeroIntensity
Copy link
Member Author

I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated asyncio.TaskGroup) but broke due to the reliance of tasks not starting before the event loop is called.

While we're here, staggered_race is undocumented -- might that be something worth addressing in this PR?

@graingert
Copy link
Contributor

A demo of what I mean wrt TaskGroup:

"""Support for running coroutines in parallel with staggered start times."""

__all__ = 'staggered_race',

from . import locks
from . import tasks
from . import taskgroups


async def staggered_race(coro_fns, delay, *, loop=None):
    """Run coroutines with staggered start times and take the first to finish.

    This method takes an iterable of coroutine functions. The first one is
    started immediately. From then on, whenever the immediately preceding one
    fails (raises an exception), or when *delay* seconds has passed, the next
    coroutine is started. This continues until one of the coroutines complete
    successfully, in which case all others are cancelled, or until all
    coroutines fail.

    The coroutines provided should be well-behaved in the following way:

    * They should only ``return`` if completed successfully.

    * They should always raise an exception if they did not complete
      successfully. In particular, if they handle cancellation, they should
      probably reraise, like this::

        try:
            # do work
        except asyncio.CancelledError:
            # undo partially completed work
            raise

    Args:
        coro_fns: an iterable of coroutine functions, i.e. callables that
            return a coroutine object when called. Use ``functools.partial`` or
            lambdas to pass arguments.

        delay: amount of time, in seconds, between starting coroutines. If
            ``None``, the coroutines will run sequentially.

        loop: the event loop to use.

    Returns:
        tuple *(winner_result, winner_index, exceptions)* where

        - *winner_result*: the result of the winning coroutine, or ``None``
          if no coroutines won.

        - *winner_index*: the index of the winning coroutine in
          ``coro_fns``, or ``None`` if no coroutines won. If the winning
          coroutine may return None on success, *winner_index* can be used
          to definitively determine whether any coroutine won.

        - *exceptions*: list of exceptions returned by the coroutines.
          ``len(exceptions)`` is equal to the number of coroutines actually
          started, and the order is the same as in ``coro_fns``. The winning
          coroutine's entry is ``None``.

    """
    # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
    winner_result = None
    winner_index = None
    exceptions = []

    class _Done(Exception):
        pass

    async def run_one_coro(this_index, coro_fn, this_failed):
        try:
            result = await coro_fn()
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as e:
            exceptions[this_index] = e
            this_failed.set()  # Kickstart the next coroutine
        else:
            # Store winner's results
            nonlocal winner_index, winner_result
            # There could be more than one winner
            winner_index = this_index
            winner_result = result
            raise _Done

    try:
        async with taskgroups.TaskGroup() as tg:
            for this_index, coro_fn in enumerate(coro_fns):
                this_failed = locks.Event()
                exceptions.append(None)
                tg.create_task(run_one_coro(this_index, coro_fn, this_failed))
                try:
                    await tasks.wait_for(this_failed.wait(), delay)
                except TimeoutError:
                    pass
    except* _Done:
        pass

    return winner_result, winner_index, exceptions

Lib/asyncio/staggered.py Show resolved Hide resolved
@ZeroIntensity
Copy link
Member Author

Thank you for the review, Thomas!

@graingert graingert added needs backport to 3.12 bug and security fixes needs backport to 3.13 bugs and security fixes labels Sep 24, 2024
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Copy link
Contributor

@willingc willingc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZeroIntensity.

Lib/asyncio/staggered.py Outdated Show resolved Hide resolved
ZeroIntensity and others added 4 commits September 25, 2024 20:53
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
@kumaraditya303 kumaraditya303 enabled auto-merge (squash) September 26, 2024 05:04
@kumaraditya303 kumaraditya303 merged commit de929f3 into python:main Sep 26, 2024
36 checks passed
@miss-islington-app
Copy link

Thanks @ZeroIntensity for the PR, and @kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13.
🐍🍒⛏🤖

miss-islington pushed a commit to miss-islington/cpython that referenced this pull request Sep 26, 2024
…port eager task factories (pythonGH-124390)

(cherry picked from commit de929f3)

Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
@miss-islington-app
Copy link

Sorry, @ZeroIntensity and @kumaraditya303, I could not cleanly backport this to 3.12 due to a conflict.
Please backport using cherry_picker on command line.

cherry_picker de929f353c413459834a2a37b2d9b0240673d874 3.12

@bedevere-app
Copy link

bedevere-app bot commented Sep 26, 2024

GH-124573 is a backport of this pull request to the 3.13 branch.

@bedevere-app
Copy link

bedevere-app bot commented Sep 26, 2024

GH-124574 is a backport of this pull request to the 3.12 branch.

@bedevere-app bedevere-app bot removed the needs backport to 3.12 bug and security fixes label Sep 26, 2024
kumaraditya303 added a commit that referenced this pull request Sep 26, 2024
…pport e… (#124574)

gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390)

Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
(cherry picked from commit de929f3)

Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
@ZeroIntensity ZeroIntensity deleted the fix-staggered-race-eager-task-factory branch September 26, 2024 10:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants