Skip to content
48 changes: 32 additions & 16 deletions web3/_utils/method_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Collection,
Dict,
Iterable,
Iterator,
NoReturn,
Tuple,
TypeVar,
Expand Down Expand Up @@ -35,7 +36,6 @@
is_string,
to_checksum_address,
to_list,
to_tuple,
)
from eth_utils.toolz import (
complement,
Expand Down Expand Up @@ -120,6 +120,29 @@

TValue = TypeVar("TValue")

AnyFilter = Union[
AsyncBlockFilter,
AsyncTransactionFilter,
AsyncLogFilter,
BlockFilter,
TransactionFilter,
LogFilter,
]

TFilter = TypeVar(
"TFilter",
AsyncBlockFilter,
AsyncTransactionFilter,
AsyncLogFilter,
BlockFilter,
TransactionFilter,
LogFilter,
)

FilterResultFormatter = Callable[
[Union["AsyncEth", "Eth"], RPCEndpoint, TValue], TFilter
]


def bytes_to_ascii(value: bytes) -> str:
return codecs.decode(value, "ascii")
Expand Down Expand Up @@ -1094,11 +1117,10 @@ def subscription_formatter(value: Any) -> Union[HexBytes, HexStr, Dict[str, Any]
}


@to_tuple
def combine_formatters(
formatter_maps: Collection[Dict[RPCEndpoint, Callable[..., TReturn]]],
method_name: RPCEndpoint,
) -> Iterable[Callable[..., TReturn]]:
) -> Iterator[Callable[..., TReturn]]:
for formatter_map in formatter_maps:
if method_name in formatter_map:
yield formatter_map[method_name]
Expand Down Expand Up @@ -1194,14 +1216,7 @@ def filter_wrapper(
module: Union["AsyncEth", "Eth"],
method: RPCEndpoint,
filter_id: HexStr,
) -> Union[
AsyncBlockFilter,
AsyncTransactionFilter,
AsyncLogFilter,
BlockFilter,
TransactionFilter,
LogFilter,
]:
) -> AnyFilter:
if method == RPC.eth_newBlockFilter:
if module.is_async:
return AsyncBlockFilter(filter_id, eth_module=cast("AsyncEth", module))
Expand All @@ -1227,19 +1242,20 @@ def filter_wrapper(
)


FILTER_RESULT_FORMATTERS: Dict[RPCEndpoint, Callable[..., Any]] = {
FILTER_RESULT_FORMATTERS: Dict[
RPCEndpoint, FilterResultFormatter[HexStr, AnyFilter]
] = {
RPC.eth_newPendingTransactionFilter: filter_wrapper,
RPC.eth_newBlockFilter: filter_wrapper,
RPC.eth_newFilter: filter_wrapper,
}


@to_tuple
def apply_module_to_formatters(
formatters: Iterable[Callable[..., TReturn]],
formatters: Iterable[FilterResultFormatter[TValue, TFilter]],
module: "Module",
method_name: Union[RPCEndpoint, Callable[..., RPCEndpoint]],
) -> Iterable[Callable[..., TReturn]]:
method_name: RPCEndpoint,
) -> Iterator[Callable[[TValue], TFilter]]:
for f in formatters:
yield partial(f, module, method_name)

Expand Down