Skip to content

Proof of concept for async pipe #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions expression/extra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
functions.
"""

from . import option, result
from . import option, result, pipe, async_result


__all__ = ["option", "result"]
__all__ = ["option", "result", "pipe", "async_result"]
6 changes: 6 additions & 0 deletions expression/extra/async_result/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Extra async_result functions."""

from .async_result import bind


__all__ = ["bind"]
19 changes: 19 additions & 0 deletions expression/extra/async_result/async_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Callable, Coroutine, Any, TypeVar
from expression import Result
from expression.effect.async_result import async_result

_TSource = TypeVar("_TSource")
_TResult = TypeVar("_TResult")
_TError = TypeVar("_TError")


def bind(
mapper: Callable[[_TSource], Coroutine[Any, Any, Result[_TResult, Any]]],
) -> Callable[[Result[_TSource, _TError]], Coroutine[Any, Any, Result[_TResult, _TError]]]:
async def wrapped(result: Result[_TSource, _TError]) -> Result[_TResult, _TError]:
return await async_result.bind(result, mapper)

return wrapped


__all__ = ["bind"]
6 changes: 6 additions & 0 deletions expression/extra/pipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Extra pipe functions."""

from .async_pipe import async_pipe


__all__ = ["async_pipe"]
168 changes: 168 additions & 0 deletions expression/extra/pipe/async_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Async Pipe module

The pipe handles both synchronous and asynchronous functions; if a function
returns a Coroutine, it is awaited before its result is passed to the next function.
"""

from collections.abc import Callable
from typing import Any, TypeVar, overload, Union, Coroutine
import inspect

_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")
_D = TypeVar("_D")
_E = TypeVar("_E")
_F = TypeVar("_F")
_G = TypeVar("_G")
_H = TypeVar("_H")
_T = TypeVar("_T")
_J = TypeVar("_J")

_X = TypeVar("_X")
_Y = TypeVar("_Y")

SyncCallable = Callable[[_X], _Y]
AsyncCallable = Callable[[_X], Coroutine[Any, Any, _Y]]
SyncOrAsyncCallable = Union[SyncCallable[_X, _Y], AsyncCallable[_X, _Y]]


@overload
async def async_pipe(value: _A, /) -> _A: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
/,
) -> _B: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
/,
) -> _C: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
/,
) -> _D: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
/,
) -> _E: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
fn5: SyncOrAsyncCallable[_E, _F],
/,
) -> _F: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
fn5: SyncOrAsyncCallable[_E, _F],
fn6: SyncOrAsyncCallable[_F, _G],
/,
) -> _G: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
fn5: SyncOrAsyncCallable[_E, _F],
fn6: SyncOrAsyncCallable[_F, _G],
fn7: SyncOrAsyncCallable[_G, _H],
/,
) -> _H: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
fn5: SyncOrAsyncCallable[_E, _F],
fn6: SyncOrAsyncCallable[_F, _G],
fn7: SyncOrAsyncCallable[_G, _H],
fn8: SyncOrAsyncCallable[_H, _T],
/,
) -> _T: ...


@overload
async def async_pipe(
value: _A,
fn1: SyncOrAsyncCallable[_A, _B],
fn2: SyncOrAsyncCallable[_B, _C],
fn3: SyncOrAsyncCallable[_C, _D],
fn4: SyncOrAsyncCallable[_D, _E],
fn5: SyncOrAsyncCallable[_E, _F],
fn6: SyncOrAsyncCallable[_F, _G],
fn7: SyncOrAsyncCallable[_G, _H],
fn8: SyncOrAsyncCallable[_H, _T],
fn9: SyncOrAsyncCallable[_T, _J],
/,
) -> _J: ...


async def async_pipe(value: Any, *functions: SyncOrAsyncCallable[Any, Any]) -> Any:
"""Functional async pipe (`|>`).

Passes the `value` to the first function in `functions`, then the result of that
to the second function, and so on, recursively. If any function in the sequence
returns a coroutine, it is `await`ed before its result is passed to the next function.

Args:
value: The initial value for the pipeline.
*functions: A sequence of functions to apply. Each function should
accept the output of the previous function (or the initial
`value` for the first function) as its input. Functions can be
synchronous or asynchronous.

Returns:
The result of passing the value through all functions in the sequence.
"""
if inspect.iscoroutine(value):
value = await value
if not functions:
return value
next_func, *remaining_functions = functions
result_or_awaitable = next_func(value)
return await async_pipe(result_or_awaitable, *remaining_functions)


__all__ = ["async_pipe"]
83 changes: 83 additions & 0 deletions tests/test_async_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from collections.abc import Callable
import asyncio

from hypothesis import given
from hypothesis import strategies as st

from expression.extra.pipe import async_pipe
from expression.extra import async_result
from expression import Result, Ok, result


@given(st.integers())
def test_pipe_id(x: int):
value = asyncio.run(async_pipe(x))
assert value == x


@given(st.integers())
def test_pipe_awaitable_only(x: int):
async def awaitable_value() -> int:
return x

value = asyncio.run(async_pipe(awaitable_value()))
assert value == x


@given(st.integers())
def test_pipe_fn(x: int):
async def gn(x: int) -> int:
return x + 1

value = asyncio.run(async_pipe(x, gn))
assert value == asyncio.run(gn(x))


@given(st.integers(), st.integers(), st.integers())
def test_pipe_fn_gn(x: int, y: int, z: int):
fn: Callable[[int], int] = lambda x: x + z

async def gn(x: int) -> int:
return x * y

value = asyncio.run(async_pipe(x, fn, gn))

assert value == asyncio.run(gn(fn(x)))

value = asyncio.run(
async_pipe(
x,
gn,
fn,
)
)

assert value == fn(asyncio.run(gn(x)))


@given(st.integers(), st.integers(), st.integers())
def test_pipe_async_result(x: int, y: int, z: int):
fn: Callable[[int], Result[int, str]] = lambda x: Ok(x + z)

async def gn(x: int) -> Result[int, str]:
return Ok(x * y)

value = asyncio.run(
async_pipe(
Ok(x),
result.bind(fn),
async_result.bind(gn),
)
)

assert value == Ok((x + z) * y)

value = asyncio.run(
async_pipe(
Ok(x),
async_result.bind(gn),
result.bind(fn),
)
)

assert value == Ok((x * y) + z)