Skip to content

Commit

Permalink
Add anyio 4.x support
Browse files Browse the repository at this point in the history
  • Loading branch information
florimondmanca committed Dec 10, 2023
1 parent 6fbc908 commit 7f79fb1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
]
dependencies = [
"anyio~=3.2",
"anyio>=3.2",
"typing-extensions; python_version<'3.8'",
]
dynamic = ["version", "readme"]
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
-e .

# Compatibility testing.
anyio~=3.2; python_version<'3.11'

# Packaging.
twine
wheel
Expand Down
23 changes: 23 additions & 0 deletions src/aiometer/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys
from contextlib import contextmanager
from typing import Generator

has_exceptiongroups = True

if sys.version_info < (3, 11): # pragma: no cover
try:
from exceptiongroup import BaseExceptionGroup
except ImportError:
has_exceptiongroups = False


@contextmanager
def collapse_excgroups() -> Generator[None, None, None]:
try:
yield
except BaseException as exc:
if has_exceptiongroups:
while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1:
exc = exc.exceptions[0]

raise exc
39 changes: 22 additions & 17 deletions src/aiometer/_impl/amap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
)

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from .._compat import collapse_excgroups
from .run_on_each import run_on_each
from .types import T, U

Expand Down Expand Up @@ -61,27 +63,30 @@ def amap(
) -> AsyncContextManager[AsyncIterable]:
@asynccontextmanager
async def _amap() -> AsyncIterator[AsyncIterable]:
send_channel, receive_channel = anyio.create_memory_object_stream(
max_buffer_size=len(args)
)
channels: Tuple[
MemoryObjectSendStream, MemoryObjectReceiveStream
] = anyio.create_memory_object_stream(max_buffer_size=len(args))

send_channel, receive_channel = channels

with send_channel, receive_channel:
async with anyio.create_task_group() as task_group:
with collapse_excgroups():
async with anyio.create_task_group() as task_group:

async def sender() -> None:
# Make any `async for ... in results: ...` terminate.
with send_channel:
await run_on_each(
async_fn,
args,
max_at_once=max_at_once,
max_per_second=max_per_second,
_include_index=_include_index,
_send_to=send_channel,
)
async def sender() -> None:
# Make any `async for ... in results: ...` terminate.
with send_channel:
await run_on_each(
async_fn,
args,
max_at_once=max_at_once,
max_per_second=max_per_second,
_include_index=_include_index,
_send_to=send_channel,
)

task_group.start_soon(sender)
task_group.start_soon(sender)

yield receive_channel
yield receive_channel

return _amap()

0 comments on commit 7f79fb1

Please sign in to comment.