Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-124309: Modernize the staggered_race implementation to support eager task factories #124390

Merged
Merged
127 changes: 58 additions & 69 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from . import events
from . import exceptions as exceptions_mod
from . import locks
from . import tasks
from . import taskgroups


async def staggered_race(coro_fns, delay, *, loop=None):
Expand Down Expand Up @@ -61,78 +61,67 @@ async def staggered_race(coro_fns, delay, *, loop=None):
coroutine's entry is ``None``.

"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
# TODO: allow async iterables in coro_fns
loop = loop or events.get_running_loop()
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
exceptions = []
running_tasks = []

async def run_one_coro(previous_failed) -> None:
# Wait for the previous task to finish, or for delay seconds
if previous_failed is not None:
with contextlib.suppress(exceptions_mod.TimeoutError):
# Use asyncio.wait_for() instead of asyncio.wait() here, so
# that if we get cancelled at this point, Event.wait() is also
# cancelled, otherwise there will be a "Task destroyed but it is
# pending" later.
await tasks.wait_for(previous_failed.wait(), delay)
# Get the next coroutine to run
try:
this_index, coro_fn = next(enum_coro_fns)
except StopIteration:
return
# Start task that will run the next coroutine
this_failed = locks.Event()
next_task = loop.create_task(run_one_coro(this_failed))
running_tasks.append(next_task)
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1

def future_callback(index, future, task_group):
assert future.done()

try:
result = await coro_fn()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
exceptions[this_index] = e
this_failed.set() # Kickstart the next coroutine
error = future.exception()
exceptions[index] = error
except exceptions_mod.CancelledError as cancelled_error:
# If another task finishes first and cancels this task, it
# is propagated here.
exceptions[index] = cancelled_error
return
else:
# Store winner's results
nonlocal winner_index, winner_result
assert winner_index is None
winner_index = this_index
winner_result = result
# Cancel all other tasks. We take care to not cancel the current
# task as well. If we do so, then since there is no `await` after
# here and CancelledError are usually thrown at one, we will
# encounter a curious corner case where the current task will end
# up as done() == True, cancelled() == False, exception() ==
# asyncio.CancelledError. This behavior is specified in
# https://bugs.python.org/issue30048
for i, t in enumerate(running_tasks):
if i != this_index:
t.cancel()

first_task = loop.create_task(run_one_coro(None))
running_tasks.append(first_task)
try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
done_count = 0
while done_count != len(running_tasks):
done, _ = await tasks.wait(running_tasks)
done_count = len(done)
# If run_one_coro raises an unhandled exception, it's probably a
# programming error, and I want to see it.
if __debug__:
for d in done:
if d.done() and not d.cancelled() and d.exception():
raise d.exception()
return winner_result, winner_index, exceptions
finally:
# Make sure no tasks are left running if we leave this function
for t in running_tasks:
t.cancel()
if error is not None:
return

nonlocal winner_result, winner_index
# If this is in an eager task factory, it's possible
# for multiple tasks to get here. In that case, we want
# only the first one to win and the rest to no-op before
# cancellation.
if winner_result is None and not task_group._aborting:
winner_result = future.result()
winner_index = index

# Cancel all other tasks, we win!
task_group._abort()
ZeroIntensity marked this conversation as resolved.
Show resolved Hide resolved

async with taskgroups.TaskGroup() as task_group:
for index, coro in enumerate(coro_fns):
if task_group._aborting:
ZeroIntensity marked this conversation as resolved.
Show resolved Hide resolved
break

exceptions.append(None)
task = loop.create_task(coro())
ZeroIntensity marked this conversation as resolved.
Show resolved Hide resolved

# We don't want the task group to propagate the error. Instead,
# we want to put it in our special exceptions list, so we manually
# create the task.
task.add_done_callback(task_group._on_task_done_without_propagation)
task_group._tasks.add(task)

# We need this extra wrapper here to stop the closure from having
# an incorrect index.
def wrapper(idx):
return lambda future: future_callback(idx, future, task_group)

task.add_done_callback(wrapper(index))

if delay is not None:
await tasks.sleep(delay or 0)
ZeroIntensity marked this conversation as resolved.
Show resolved Hide resolved
else:
# We don't care about exceptions here, the callback will
# deal with it.
with contextlib.suppress(BaseException):
# If there's no delay, we just wait until completion.
await task

return winner_result, winner_index, exceptions
8 changes: 6 additions & 2 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ def _abort(self):
if not t.done():
t.cancel()

def _on_task_done(self, task):
def _on_task_done_without_propagation(self, task):
# For staggered_race()
self._tasks.discard(task)

if self._on_completed_fut is not None and not self._tasks:
Expand All @@ -209,7 +210,10 @@ def _on_task_done(self, task):
if task.cancelled():
return

exc = task.exception()
return task.exception()

def _on_task_done(self, task):
exc = self._on_task_done_without_propagation(task)
if exc is None:
return

Expand Down
45 changes: 45 additions & 0 deletions Lib/test/test_asyncio/test_eager_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,51 @@ async def run():

self.run_coro(run())

# See GH-124309 for both of these
def test_staggered_race_with_eager_tasks(self):
async def fail():
await asyncio.sleep(0) # Dummy coroutine
raise ValueError("no good")

async def run():
winner, index, excs = await asyncio.staggered.staggered_race(
[
lambda: asyncio.sleep(2),
lambda: asyncio.sleep(1),
lambda: fail()
],
delay=0.25
)
self.assertIsNone(winner)
self.assertEqual(index, 1)
self.assertIsNone(excs[index])
self.assertIsInstance(excs[0], asyncio.CancelledError)
self.assertIsInstance(excs[2], ValueError)

self.run_coro(run())

def test_staggered_race_with_eager_tasks_no_delay(self):
async def fail():
raise ValueError("no good")

async def run():
winner, index, excs = await asyncio.staggered.staggered_race(
[
lambda: fail(),
lambda: asyncio.sleep(1),
lambda: asyncio.sleep(0),
],
delay=None
)
self.assertIsNone(winner)
self.assertEqual(index, 1)
self.assertIsNone(excs[index])
self.assertIsInstance(excs[0], ValueError)
self.assertEqual(len(excs), 2)

self.run_coro(run())



class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
Task = tasks._PyTask
Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_asyncio/test_staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ async def coro(index):
self.assertEqual(len(excs), 2)
self.assertIsInstance(excs[0], ValueError)
self.assertIsInstance(excs[1], ValueError)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Modernize the :func:`!asyncio.staggered.staggered_task` function to now
ZeroIntensity marked this conversation as resolved.
Show resolved Hide resolved
support :attr:`asyncio.eager_task_factory` and use :class:`asyncio.TaskGroup` internally.
Loading