Skip to content

Commit

Permalink
reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Sep 30, 2024
1 parent faf5353 commit 39746b6
Showing 1 changed file with 8 additions and 24 deletions.
32 changes: 8 additions & 24 deletions src/aiohappyeyeballs/_staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import contextlib
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Iterable,
Expand All @@ -23,27 +22,6 @@ def _set_result(wait_next: "asyncio.Future[None]") -> None:
wait_next.set_result(None)


async def _wait_one(
futures: "Iterable[asyncio.Future[Any]]",
loop: asyncio.AbstractEventLoop,
) -> _T:
"""Wait for the first future to complete."""
wait_next = loop.create_future()

def _on_completion(fut: "asyncio.Future[Any]") -> None:
if not wait_next.done():
wait_next.set_result(fut)

for f in futures:
f.add_done_callback(_on_completion)

try:
return await wait_next
finally:
for f in futures:
f.remove_done_callback(_on_completion)


async def staggered_race(
coro_fns: Iterable[Callable[[], Awaitable[_T]]],
delay: Optional[float],
Expand Down Expand Up @@ -138,6 +116,9 @@ async def run_one_coro(
start_next: Optional[asyncio.Future[None]]
task: asyncio.Task[Optional[Tuple[_T, int]]]
done: Union[asyncio.Future[None], asyncio.Task[Optional[Tuple[_T, int]]]]
waiters: Iterable[
Union[asyncio.Future[None], asyncio.Task[Optional[Tuple[_T, int]]]]
]
coro_iter = iter(coro_fns)
this_index = -1
try:
Expand All @@ -155,9 +136,12 @@ async def run_one_coro(
break

while tasks:
done = await _wait_one(
[*tasks, start_next] if start_next else tasks, loop
waiters = [*tasks, start_next] if start_next else tasks
dones, _ = await asyncio.wait(
waiters,
return_when=asyncio.FIRST_COMPLETED,
)
done = dones.pop()
if done is start_next:
# The current task has failed or the timer has expired
# so we need to start the next task.
Expand Down

0 comments on commit 39746b6

Please sign in to comment.