Skip to content

Fix ReAwaitable to support concurrent await calls (#2108) #2150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ incremental in minor, bugfixes only are patches.
See [0Ver](https://0ver.org/).


## Unreleased

### Bugfixes

- Fixes that `ReAwaitable` does not support concurrent await calls. Issue #2108


## 0.25.0

### Features
Expand Down
11 changes: 11 additions & 0 deletions docs/pages/future.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ its result to ``IO``-based containers.
This helps a lot when separating pure and impure
(async functions are impure) code inside your app.

.. note::
``Future`` containers can be awaited multiple times and support concurrent
awaits from multiple async tasks. This is achieved through an internal
caching mechanism that ensures the underlying coroutine is only executed
once, while all subsequent or concurrent awaits receive the cached result.
This makes ``Future`` containers safe to use in complex async workflows
where the same future might be awaited from different parts of your code.

The implementation supports multiple async frameworks including asyncio,
trio, and anyio, with automatic framework detection and fallback support.


FutureResult
------------
Expand Down
25 changes: 22 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pytest-mypy-plugins = "^3.1"
pytest-subtests = "^0.14"
pytest-shard = "^0.1"
covdefaults = "^2.3"
pytest-asyncio = "^1.0.0"

[tool.poetry.group.docs]
optional = true
Expand Down
2 changes: 1 addition & 1 deletion returns/contrib/mypy/_features/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def attribute_access(ctx: AttributeContext) -> MypyType:
is_lvalue=False,
is_super=False,
is_operator=False,
msg=ctx.api.msg,
msg=exprchecker.msg,
original_type=instance,
chk=ctx.api, # type: ignore
in_literal_context=exprchecker.is_literal_context(),
Expand Down
114 changes: 110 additions & 4 deletions returns/primitives/reawaitable.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import logging
import threading
from collections.abc import Awaitable, Callable, Generator
from functools import wraps
from typing import NewType, ParamSpec, TypeVar, cast, final
from typing import Any, NewType, ParamSpec, TypeVar, cast, final

_ValueType = TypeVar('_ValueType')
_AwaitableT = TypeVar('_AwaitableT', bound=Awaitable)
Expand All @@ -19,6 +22,23 @@ class ReAwaitable:
So, in reality we still ``await`` once,
but pretending to do it multiple times.

This class is thread-safe and supports concurrent awaits from multiple
async tasks. When multiple tasks await the same instance simultaneously,
only one will execute the underlying coroutine while others will wait
and receive the cached result.

**Async Framework Support and Lock Selection:**

The lock selection follows a strict priority order with automatic fallback:

1. **asyncio.Lock()** - Primary choice when asyncio event loop available
2. **trio.Lock()** - Used when asyncio fails and trio available
3. **anyio.Lock()** - Used when asyncio/trio fail, anyio available
4. **threading.Lock()** - Final fallback for unsupported frameworks

Lock selection happens lazily on first await and is logged at DEBUG level
for troubleshooting. The framework detection is automatic and transparent.

Why is that required? Because otherwise,
``Future`` containers would be unusable:

Expand Down Expand Up @@ -48,12 +68,13 @@ class ReAwaitable:

"""

__slots__ = ('_cache', '_coro')
__slots__ = ('_cache', '_coro', '_lock')

def __init__(self, coro: Awaitable[_ValueType]) -> None:
"""We need just an awaitable to work with."""
self._coro = coro
self._cache: _ValueType | _Sentinel = _sentinel
self._lock: Any | None = None

def __await__(self) -> Generator[None, None, _ValueType]:
"""
Expand Down Expand Up @@ -99,10 +120,95 @@ def __repr__(self) -> str:
"""
return repr(self._coro)

def _try_asyncio_lock(self, logger: logging.Logger) -> Any:
"""Try to create an asyncio lock."""
try:
asyncio_lock = asyncio.Lock()
except RuntimeError:
return None
logger.debug('ReAwaitable: Using asyncio.Lock for concurrency control')
return asyncio_lock

def _try_trio_lock(self, logger: logging.Logger) -> Any:
"""Try to create a trio lock."""
try:
import trio # noqa: PLC0415
except ImportError:
return None
trio_lock = trio.Lock()
logger.debug('ReAwaitable: Using trio.Lock for concurrency control')
return trio_lock

def _try_anyio_lock(self, logger: logging.Logger) -> Any:
"""Try to create an anyio lock."""
try:
import anyio # noqa: PLC0415
except ImportError:
return None
anyio_lock = anyio.Lock()
logger.debug('ReAwaitable: Using anyio.Lock for concurrency control')
return anyio_lock

def _create_lock(self) -> Any: # noqa: WPS320
"""Create appropriate lock for the current async framework.

Attempts framework detection: asyncio -> trio -> anyio -> threading.
Logs the selected framework at DEBUG level for troubleshooting.
"""
logger = logging.getLogger(__name__)

# Try asyncio first (most common)
asyncio_lock = self._try_asyncio_lock(logger)
if asyncio_lock is not None:
return asyncio_lock

logger.debug('ReAwaitable: asyncio.Lock unavailable, trying trio')

# Try trio
trio_lock = self._try_trio_lock(logger)
if trio_lock is not None:
return trio_lock

logger.debug('ReAwaitable: trio.Lock unavailable, trying anyio')

# Try anyio
anyio_lock = self._try_anyio_lock(logger)
if anyio_lock is not None:
return anyio_lock

logger.debug(
'ReAwaitable: anyio.Lock unavailable, '
'falling back to threading.Lock'
)

# Fallback to threading lock
threading_lock = threading.Lock()
logger.debug(
'ReAwaitable: Using threading.Lock fallback for concurrency control'
)
return threading_lock

async def _awaitable(self) -> _ValueType:
"""Caches the once awaited value forever."""
if self._cache is _sentinel:
self._cache = await self._coro
if self._cache is not _sentinel:
return self._cache # type: ignore

# Create lock on first use to detect the async framework
if self._lock is None:
self._lock = self._create_lock()

# Handle different lock types
if hasattr(self._lock, '__aenter__'):
# Async lock (asyncio, trio, anyio)
async with self._lock:
if self._cache is _sentinel:
self._cache = await self._coro
else:
# Threading lock fallback for unsupported frameworks
with self._lock:
if self._cache is _sentinel:
self._cache = await self._coro

return self._cache # type: ignore


Expand Down
6 changes: 6 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ addopts =
--cov-fail-under=100
# pytest-mypy-plugin:
--mypy-ini-file=setup.cfg
# pytest-asyncio:
--asyncio-mode=auto

# Registered markers:
markers =
asyncio: mark test as asynchronous

# Ignores some warnings inside:
filterwarnings =
Expand Down
1 change: 1 addition & 0 deletions tests/test_contrib/test_hypothesis/test_laws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty init file for test module
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from hypothesis import HealthCheck
from hypothesis import strategies as st
from test_hypothesis.test_laws import test_custom_type_applicative

from returns.contrib.hypothesis.laws import check_all_laws

from . import test_custom_type_applicative # noqa: WPS300

container_type = test_custom_type_applicative._Wrapper # noqa: SLF001

check_all_laws(
container_type,
container_strategy=st.builds(container_type, st.integers()),
settings_kwargs={'suppress_health_check': [HealthCheck.too_slow]},
)
1 change: 1 addition & 0 deletions tests/test_primitives/test_reawaitable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty init file for test module
Loading