diff --git a/p2p/_utils.py b/p2p/_utils.py index 2aa714d6c3..e3c5db740f 100644 --- a/p2p/_utils.py +++ b/p2p/_utils.py @@ -1,5 +1,5 @@ import collections -from typing import Hashable, Sequence, Tuple, TypeVar +from typing import Hashable, Sequence, Tuple, TypeVar, AsyncGenerator, Any import rlp @@ -93,3 +93,18 @@ def duplicates(elements: Sequence[TValue]) -> Tuple[TValue, ...]: collections.Counter(elements).items() if count > 1 ) + + +TCo = TypeVar('TCo') +TContra = TypeVar('TContra') + + +class aclosing: + def __init__(self, aiter: AsyncGenerator[TCo, TContra]) -> None: + self._aiter = aiter + + async def __aenter__(self) -> AsyncGenerator[TCo, TContra]: + return self._aiter + + async def __aexit__(self, *args: Any) -> None: + await self._aiter.aclose() diff --git a/p2p/behaviors.py b/p2p/behaviors.py index 7a1fd556f7..575feb2114 100644 --- a/p2p/behaviors.py +++ b/p2p/behaviors.py @@ -1,9 +1,8 @@ +import contextlib from typing import ( AsyncIterator, ) -from async_generator import asynccontextmanager - from eth_utils import ValidationError from p2p.abc import ( @@ -36,7 +35,7 @@ def should_apply_to(self, connection: 'ConnectionAPI') -> bool: # mypy bug: https://github.com/python/mypy/issues/708 return self.qualifier(connection, self.logic) # type: ignore - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection: ConnectionAPI) -> AsyncIterator[None]: if self._applied_to is not None: raise ValidationError( diff --git a/p2p/discovery.py b/p2p/discovery.py index 40a0a01af5..b5fa471eaf 100644 --- a/p2p/discovery.py +++ b/p2p/discovery.py @@ -29,8 +29,6 @@ TypeVar, ) -from async_generator import aclosing - import trio import eth_utils.toolz @@ -82,7 +80,7 @@ ) from p2p.kademlia import ( Address, Node, check_relayed_addr, create_stub_enr, sort_by_distance, KademliaRoutingTable) -from p2p._utils import get_logger +from p2p._utils import get_logger, aclosing from p2p import trio_utils # V4 handler are async methods that take a Node, payload and msg_hash as arguments. diff --git a/p2p/discv5/message_dispatcher.py b/p2p/discv5/message_dispatcher.py index 29a1bf6714..cea58292f1 100644 --- a/p2p/discv5/message_dispatcher.py +++ b/p2p/discv5/message_dispatcher.py @@ -1,3 +1,4 @@ +import contextlib import logging import random from types import ( @@ -14,8 +15,6 @@ TypeVar, ) -from async_generator import asynccontextmanager - import trio from trio.abc import ( ReceiveChannel, @@ -289,7 +288,7 @@ async def get_endpoint_from_node_db(self, receiver_node_id: NodeID) -> Endpoint: return Endpoint(ip_address, udp_port) - @asynccontextmanager + @contextlib.asynccontextmanager async def request_response_subscription(self, receiver_node_id: NodeID, message: BaseMessage, diff --git a/p2p/exchange/exchange.py b/p2p/exchange/exchange.py index 1a3663b72d..06b20bdc71 100644 --- a/p2p/exchange/exchange.py +++ b/p2p/exchange/exchange.py @@ -1,11 +1,11 @@ from functools import partial +import contextlib from typing import ( AsyncIterator, Callable, Type, ) -from async_generator import asynccontextmanager from async_service import background_asyncio_service from p2p.abc import ConnectionAPI @@ -25,7 +25,7 @@ class BaseExchange(ExchangeAPI[TRequestCommand, TResponseCommand, TResult]): def __init__(self) -> None: self.tracker = self.tracker_class() - @asynccontextmanager + @contextlib.asynccontextmanager async def run_exchange(self, connection: ConnectionAPI) -> AsyncIterator[None]: protocol = connection.get_protocol_for_command_type(self.get_request_cmd_type()) diff --git a/p2p/exchange/logic.py b/p2p/exchange/logic.py index 3e8bdf29d2..c08c6396d8 100644 --- a/p2p/exchange/logic.py +++ b/p2p/exchange/logic.py @@ -1,7 +1,6 @@ +import contextlib from typing import Any, AsyncIterator -from async_generator import asynccontextmanager - from p2p.abc import ConnectionAPI, LogicAPI from p2p.exceptions import UnknownProtocol from p2p.logic import BaseLogic @@ -29,7 +28,7 @@ def qualifier(self, connection: ConnectionAPI, logic: LogicAPI) -> bool: else: return protocol.supports_command(self.exchange.get_response_cmd_type()) - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection: ConnectionAPI) -> AsyncIterator[None]: async with self.exchange.run_exchange(connection): yield diff --git a/p2p/logic.py b/p2p/logic.py index 68db8492c8..05f1b26095 100644 --- a/p2p/logic.py +++ b/p2p/logic.py @@ -1,4 +1,5 @@ from abc import abstractmethod +import contextlib from typing import ( cast, AsyncIterator, @@ -7,9 +8,6 @@ Type, ) -from async_generator import asynccontextmanager -from async_exit_stack import AsyncExitStack - from p2p.abc import ( BehaviorAPI, ConnectionAPI, @@ -49,7 +47,7 @@ class CommandHandler(BaseLogic, Generic[TCommand]): def qualifier(self) -> QualifierFn: # type: ignore return HasCommand(self.command_type) - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection: ConnectionAPI) -> AsyncIterator[None]: self.connection = connection @@ -76,11 +74,11 @@ class Application(BaseLogic): def add_child_behavior(self, behavior: BehaviorAPI) -> None: self._behaviors += (behavior,) - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection: ConnectionAPI) -> AsyncIterator[None]: self.connection = connection - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: # First apply all the child behaviors for behavior in self._behaviors: if behavior.should_apply_to(connection): diff --git a/p2p/multiplexer.py b/p2p/multiplexer.py index 52259678ad..d1f9d5965f 100644 --- a/p2p/multiplexer.py +++ b/p2p/multiplexer.py @@ -1,5 +1,6 @@ import asyncio import collections +import contextlib import time from typing import ( Any, @@ -15,8 +16,6 @@ from cached_property import cached_property -from async_generator import asynccontextmanager - from eth_utils import ValidationError from eth_utils.toolz import cons import rlp @@ -291,7 +290,7 @@ async def _stream_protocol_messages(self, # # Message reading and streaming API # - @asynccontextmanager + @contextlib.asynccontextmanager async def multiplex(self) -> AsyncIterator[None]: """ API for running the background task that feeds individual protocol diff --git a/p2p/p2p_api.py b/p2p/p2p_api.py index 43ed17f69c..ea2e440b4d 100644 --- a/p2p/p2p_api.py +++ b/p2p/p2p_api.py @@ -1,8 +1,7 @@ import asyncio +import contextlib from typing import Any, AsyncIterator, cast -from async_generator import asynccontextmanager - from cached_property import cached_property from async_service import ( @@ -80,7 +79,7 @@ class DisconnectIfIdle(BaseLogic): def __init__(self, idle_timeout: float) -> None: self.idle_timeout = idle_timeout - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection: ConnectionAPI) -> AsyncIterator[None]: service = PingAndDisconnectIfIdle(connection, self.idle_timeout) async with background_asyncio_service(service): diff --git a/p2p/peer.py b/p2p/peer.py index cb003f6362..e1f5c2be83 100644 --- a/p2p/peer.py +++ b/p2p/peer.py @@ -18,8 +18,6 @@ TYPE_CHECKING, ) -from async_exit_stack import AsyncExitStack - from async_service import Service from lahja import EndpointAPI @@ -261,7 +259,7 @@ async def run(self) -> None: self._start_time = time.monotonic() self.connection.add_command_handler(Disconnect, cast(HandlerFn, self._handle_disconnect)) try: - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(P2PAPI().as_behavior().apply(self.connection)) self.p2p_api = self.connection.get_logic('p2p', P2PAPI) diff --git a/p2p/resource_lock.py b/p2p/resource_lock.py index 2a0321e9c2..5d7e9265c5 100644 --- a/p2p/resource_lock.py +++ b/p2p/resource_lock.py @@ -1,10 +1,9 @@ import asyncio from collections import defaultdict from collections.abc import Hashable +import contextlib from typing import AsyncIterator, DefaultDict, Dict, Generic, TypeVar -from async_generator import asynccontextmanager - TResource = TypeVar('TResource', bound=Hashable) @@ -20,7 +19,7 @@ def __init__(self) -> None: self._locks = {} self._reference_counts = defaultdict(int) - @asynccontextmanager + @contextlib.asynccontextmanager async def lock(self, resource: TResource) -> AsyncIterator[None]: if resource not in self._locks: self._locks[resource] = asyncio.Lock() diff --git a/p2p/service.py b/p2p/service.py index 8b2dd24733..851d9d23b7 100644 --- a/p2p/service.py +++ b/p2p/service.py @@ -1,6 +1,7 @@ from abc import abstractmethod import asyncio import concurrent +import contextlib import functools import time from typing import ( @@ -15,7 +16,6 @@ from weakref import WeakSet from async_service import Service, ServiceAPI -from async_generator import asynccontextmanager from cancel_token import CancelToken, OperationCancelled from eth_utils import ( @@ -423,7 +423,7 @@ async def _cleanup(self) -> None: TService = TypeVar('TService', bound=AsyncioServiceAPI) -@asynccontextmanager +@contextlib.asynccontextmanager async def run_service(service: TService) -> AsyncIterator[TService]: task = asyncio.ensure_future(service.run()) await service.events.started.wait() diff --git a/p2p/tools/factories/connection.py b/p2p/tools/factories/connection.py index 4610e3ae43..943b5cf2a5 100644 --- a/p2p/tools/factories/connection.py +++ b/p2p/tools/factories/connection.py @@ -1,8 +1,7 @@ import asyncio +import contextlib from typing import AsyncIterator, Tuple -from async_generator import asynccontextmanager - from async_service import background_asyncio_service from eth_keys import keys @@ -20,7 +19,7 @@ from .transport import MemoryTransportPairFactory -@asynccontextmanager +@contextlib.asynccontextmanager async def ConnectionPairFactory(*, alice_handshakers: Tuple[HandshakerAPI[ProtocolAPI], ...] = (), bob_handshakers: Tuple[HandshakerAPI[ProtocolAPI], ...] = (), diff --git a/p2p/tools/factories/peer.py b/p2p/tools/factories/peer.py index e588d38798..18dbe50ad1 100644 --- a/p2p/tools/factories/peer.py +++ b/p2p/tools/factories/peer.py @@ -1,8 +1,7 @@ import asyncio +import contextlib from typing import cast, AsyncContextManager, AsyncIterator, Tuple, Type -from async_generator import asynccontextmanager - from lahja import EndpointAPI from eth_keys import keys @@ -15,7 +14,7 @@ from .connection import ConnectionPairFactory -@asynccontextmanager +@contextlib.asynccontextmanager async def PeerPairFactory(*, alice_peer_context: BasePeerContext, alice_peer_factory_class: Type[BasePeerFactory], diff --git a/setup.py b/setup.py index a5de4b65f9..69dd71ca54 100644 --- a/setup.py +++ b/setup.py @@ -9,8 +9,6 @@ deps = { 'p2p': [ - "async-exit-stack==1.0.1", - "async-generator==1.10", "async-service==0.1.0a7", "asyncio-cancel-token>=0.2,<0.3", "async_lru>=0.1.0,<1.0.0", @@ -34,7 +32,6 @@ "bloom-filter==1.3", "cachetools>=3.1.0,<4.0.0", "coincurve>=10.0.0,<11.0.0", - "dataclasses>=0.6, <1;python_version<'3.7'", "eth-utils>=1.8.4,<2", # Fixing this dependency due to: requests 2.20.1 has requirement # idna<2.8,>=2.5, but you'll have idna 2.8 which is incompatible. diff --git a/tests-trio/p2p-trio/conftest.py b/tests-trio/p2p-trio/conftest.py index b61a9e796a..767a045df1 100644 --- a/tests-trio/p2p-trio/conftest.py +++ b/tests-trio/p2p-trio/conftest.py @@ -1,10 +1,9 @@ +import contextlib import logging import trio import pytest_trio -from async_generator import asynccontextmanager - from async_service import background_trio_service from eth_hash.auto import keccak @@ -38,7 +37,7 @@ async def socket_pair(): return sending_socket, receiving_socket -@asynccontextmanager +@contextlib.asynccontextmanager async def _manually_driven_discovery(seed, socket, nursery): _, port = socket.getsockname() discovery = ManuallyDrivenDiscoveryService( diff --git a/tests/conftest.py b/tests/conftest.py index d1a9b04f11..3e17522e76 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,10 @@ import asyncio +import contextlib import os from pathlib import Path import tempfile import uuid -from async_generator import ( - asynccontextmanager, -) import pytest from async_service import background_asyncio_service @@ -97,7 +95,7 @@ def event_loop(): loop.close() -@asynccontextmanager +@contextlib.asynccontextmanager async def make_networking_event_bus(): # Tests run concurrently, therefore we need unique IPC paths ipc_path = Path(f"networking-{uuid.uuid4()}.ipc") diff --git a/tests/core/integration_test_helpers.py b/tests/core/integration_test_helpers.py index 63d61d620d..f7d86bd810 100644 --- a/tests/core/integration_test_helpers.py +++ b/tests/core/integration_test_helpers.py @@ -1,12 +1,10 @@ import asyncio +import contextlib from enum import Enum from pathlib import Path from tempfile import TemporaryDirectory from zipfile import ZipFile -from async_generator import ( - asynccontextmanager, -) from async_service import background_asyncio_service from cancel_token import OperationCancelled from eth_keys import keys @@ -93,7 +91,7 @@ def load_fixture_db(db_fixture, db_class=LevelDB): yield db_class(Path(tmpdir) / db_fixture.value) -@asynccontextmanager +@contextlib.asynccontextmanager async def run_peer_pool_event_server(event_bus, peer_pool, handler_type=None): handler_type = DefaultPeerPoolEventServer if handler_type is None else handler_type diff --git a/tests/core/json-rpc/test_rpc_during_beam_sync.py b/tests/core/json-rpc/test_rpc_during_beam_sync.py index bd980e8fc0..92e845be52 100644 --- a/tests/core/json-rpc/test_rpc_during_beam_sync.py +++ b/tests/core/json-rpc/test_rpc_during_beam_sync.py @@ -1,12 +1,11 @@ import asyncio +import contextlib import json import os import pytest import time from typing import Dict -from async_generator import asynccontextmanager - from eth_hash.auto import keccak from eth_utils.toolz import ( assoc, @@ -158,7 +157,7 @@ async def make_request(*args): @pytest.fixture def fake_beam_syncer(chain, event_bus): - @asynccontextmanager + @contextlib.asynccontextmanager async def fake_beam_sync(removed_nodes: Dict): # beam sync starts, it fetches requested nodes from remote peers diff --git a/tests/core/p2p-proto/test_requests.py b/tests/core/p2p-proto/test_requests.py index 1a9cb8c7d9..c6453ccb38 100644 --- a/tests/core/p2p-proto/test_requests.py +++ b/tests/core/p2p-proto/test_requests.py @@ -1,8 +1,8 @@ import asyncio +import contextlib import pytest -from async_exit_stack import AsyncExitStack from async_service import background_asyncio_service from eth_utils import decode_hex @@ -64,7 +64,7 @@ async def test_proxy_peer_requests(request, client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer], event_bus=client_event_bus) server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=server_event_bus) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(run_peer_pool_event_server( client_event_bus, client_peer_pool, handler_type=ETHPeerPoolEventServer )) @@ -124,7 +124,7 @@ async def test_get_pooled_transactions_request(request, client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer], event_bus=client_event_bus) server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=server_event_bus) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(run_peer_pool_event_server( client_event_bus, client_peer_pool, handler_type=ETHPeerPoolEventServer )) @@ -171,7 +171,7 @@ async def test_proxy_peer_requests_with_timeouts(request, client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer], event_bus=client_event_bus) server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=server_event_bus) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(run_peer_pool_event_server( client_event_bus, client_peer_pool, handler_type=ETHPeerPoolEventServer )) @@ -221,7 +221,7 @@ async def test_requests_when_peer_in_client_vanishs(request, client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer], event_bus=client_event_bus) server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=server_event_bus) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(run_peer_pool_event_server( client_event_bus, client_peer_pool, handler_type=ETHPeerPoolEventServer )) diff --git a/tests/core/test_contextgroup.py b/tests/core/test_contextgroup.py index c771dfb8b5..42448593ed 100644 --- a/tests/core/test_contextgroup.py +++ b/tests/core/test_contextgroup.py @@ -1,11 +1,10 @@ import asyncio +import contextlib import pytest from trio import MultiError -from async_generator import asynccontextmanager - from trinity.contextgroup import AsyncContextGroup @@ -13,7 +12,7 @@ async def test_basic(): exit_count = 0 - @asynccontextmanager + @contextlib.asynccontextmanager async def ctx(v): nonlocal exit_count await asyncio.sleep(0) @@ -32,7 +31,7 @@ async def ctx(v): async def test_exception_entering_context(): exit_count = 0 - @asynccontextmanager + @contextlib.asynccontextmanager async def ctx(should_raise=False): nonlocal exit_count await asyncio.sleep(0) @@ -64,7 +63,7 @@ async def f(should_raise): if should_raise: raise ValueError() - @asynccontextmanager + @contextlib.asynccontextmanager async def ctx(should_raise=False): nonlocal exit_count await asyncio.sleep(0) @@ -87,7 +86,7 @@ async def ctx(should_raise=False): async def test_exception_exiting(): exit_count = 0 - @asynccontextmanager + @contextlib.asynccontextmanager async def ctx(should_raise=False): nonlocal exit_count await asyncio.sleep(0) diff --git a/tests/core/tx-pool/test_tx_pool.py b/tests/core/tx-pool/test_tx_pool.py index 135798fcf6..21192cd84a 100644 --- a/tests/core/tx-pool/test_tx_pool.py +++ b/tests/core/tx-pool/test_tx_pool.py @@ -1,8 +1,8 @@ import asyncio +import contextlib import pytest import uuid -from async_exit_stack import AsyncExitStack from async_service import background_asyncio_service from eth._utils.address import ( force_bytes_to_address @@ -74,7 +74,7 @@ async def two_connected_tx_pools(event_bus, bob_peer_pool = MockPeerPoolWithConnectedPeers([bob], event_bus=bob_event_bus) alice_peer_pool = MockPeerPoolWithConnectedPeers([alice], event_bus=alice_event_bus) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(run_peer_pool_event_server( bob_event_bus, bob_peer_pool, handler_type=ETHPeerPoolEventServer )) diff --git a/tests/libp2p/bcc/test_syncing.py b/tests/libp2p/bcc/test_syncing.py index 25256dd6d8..50794c0511 100644 --- a/tests/libp2p/bcc/test_syncing.py +++ b/tests/libp2p/bcc/test_syncing.py @@ -1,6 +1,6 @@ import asyncio +import contextlib -from async_generator import asynccontextmanager import pytest from trinity.tools.bcc_factories import ( @@ -11,7 +11,7 @@ ) -@asynccontextmanager +@contextlib.asynccontextmanager async def get_sync_setup( request, event_loop, diff --git a/tests/p2p/test_behavior_and_logic_api.py b/tests/p2p/test_behavior_and_logic_api.py index d19815c29b..d5d03b8f36 100644 --- a/tests/p2p/test_behavior_and_logic_api.py +++ b/tests/p2p/test_behavior_and_logic_api.py @@ -1,8 +1,7 @@ import asyncio +import contextlib import pytest -from async_generator import asynccontextmanager - from eth_utils import ValidationError from p2p.p2p_proto import Ping @@ -18,7 +17,7 @@ class SimpleLogic(BaseLogic): - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection): yield diff --git a/tests/p2p/test_qualifiers.py b/tests/p2p/test_qualifiers.py index e1733ec495..882fdc7b6a 100644 --- a/tests/p2p/test_qualifiers.py +++ b/tests/p2p/test_qualifiers.py @@ -1,7 +1,6 @@ +import contextlib import pytest -from async_generator import asynccontextmanager - from p2p.abc import ConnectionAPI, LogicAPI from p2p.logic import BaseLogic from p2p.qualifiers import ( @@ -18,7 +17,7 @@ class SimpleLogic(BaseLogic): - @asynccontextmanager + @contextlib.asynccontextmanager async def apply(self, connection): yield diff --git a/trinity/components/builtin/json_rpc/component.py b/trinity/components/builtin/json_rpc/component.py index 63062b2730..cadf298185 100644 --- a/trinity/components/builtin/json_rpc/component.py +++ b/trinity/components/builtin/json_rpc/component.py @@ -6,7 +6,6 @@ from typing import Iterator, Tuple, Union from async_service import background_asyncio_service, Service -from async_exit_stack import AsyncExitStack from lahja import EndpointAPI @@ -147,7 +146,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: ) services_to_exit += (http_server,) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: managers = tuple([ await stack.enter_async_context(background_asyncio_service(service)) for service in services_to_exit diff --git a/trinity/components/builtin/metrics/component.py b/trinity/components/builtin/metrics/component.py index 18ba383b22..52a845b471 100644 --- a/trinity/components/builtin/metrics/component.py +++ b/trinity/components/builtin/metrics/component.py @@ -4,9 +4,9 @@ Namespace, _SubParsersAction, ) +import contextlib from typing import Type -from async_exit_stack import AsyncExitStack from async_service import background_trio_service from eth_utils import ValidationError @@ -164,7 +164,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: blockchain_metrics_collector, ) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: managers = tuple([ await stack.enter_async_context(background_trio_service(service)) for service in services_to_exit diff --git a/trinity/components/builtin/network_db/component.py b/trinity/components/builtin/network_db/component.py index 788e9cc6db..f0dad4367a 100644 --- a/trinity/components/builtin/network_db/component.py +++ b/trinity/components/builtin/network_db/component.py @@ -4,10 +4,10 @@ _SubParsersAction, ) import asyncio +import contextlib import logging from typing import ClassVar, Iterable -from async_exit_stack import AsyncExitStack from async_service import background_asyncio_service, Service from eth_utils import ValidationError, to_tuple from lahja import EndpointAPI @@ -240,7 +240,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: except BadDatabaseError as err: cls.logger.exception(f"Unrecoverable error in Network Component: {err}") - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: tracker_managers = tuple([ await stack.enter_async_context(background_asyncio_service(service)) for service in tracker_services diff --git a/trinity/components/eth2/beacon/component.py b/trinity/components/eth2/beacon/component.py index 04bb4af2b1..3a10d589cf 100644 --- a/trinity/components/eth2/beacon/component.py +++ b/trinity/components/eth2/beacon/component.py @@ -1,9 +1,9 @@ from argparse import ArgumentParser, _SubParsersAction import asyncio +import contextlib import logging from typing import Set, Tuple -from async_exit_stack import AsyncExitStack from lahja import EndpointAPI from libp2p.crypto.keys import KeyPair from libp2p.crypto.secp256k1 import create_new_key_pair @@ -174,7 +174,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: if boot_info.args.bn_only: services += (chain_maintainer, validator_handler) - async with AsyncExitStack() as stack: + async with contextlib.AsyncExitStack() as stack: for service in services: await stack.enter_async_context(run_service(service)) diff --git a/trinity/extensibility/component.py b/trinity/extensibility/component.py index 79b51e144b..064a6e59ec 100644 --- a/trinity/extensibility/component.py +++ b/trinity/extensibility/component.py @@ -7,12 +7,12 @@ _SubParsersAction, ) import asyncio +import contextlib import logging import pathlib import signal from typing import AsyncIterator, Type, TYPE_CHECKING, Union -from async_generator import asynccontextmanager from lahja import AsyncioEndpoint, ConnectionConfig, EndpointAPI, TrioEndpoint @@ -119,7 +119,7 @@ async def _cleanup_component_task(component_name: str, task: "asyncio.Future[Non logger.debug("Stopped component: %s", component_name) -@asynccontextmanager +@contextlib.asynccontextmanager async def run_component(component: ComponentAPI) -> AsyncIterator[None]: task = asyncio.ensure_future(component.run()) logger.debug("Starting component: %s", component.name) @@ -129,7 +129,7 @@ async def run_component(component: ComponentAPI) -> AsyncIterator[None]: await _cleanup_component_task(component.name, task) -@asynccontextmanager +@contextlib.asynccontextmanager async def _run_asyncio_component_in_proc( component_type: Type['AsyncioIsolatedComponent'], event_bus: EndpointAPI, @@ -147,7 +147,7 @@ async def _run_asyncio_component_in_proc( await _cleanup_component_task(component.name, task) -@asynccontextmanager +@contextlib.asynccontextmanager async def _run_trio_component_in_proc( component_type: Type['TrioIsolatedComponent'], event_bus: EndpointAPI, @@ -165,7 +165,7 @@ async def _run_trio_component_in_proc( logger.debug("Stopped component: %s", component_type.name) -@asynccontextmanager +@contextlib.asynccontextmanager async def _run_standalone_component( component_type: Union[Type['TrioIsolatedComponent'], Type['AsyncioIsolatedComponent']], app_identifier: str, diff --git a/trinity/protocol/bcc_libp2p/node.py b/trinity/protocol/bcc_libp2p/node.py index 8596a53a78..556d5fcee1 100644 --- a/trinity/protocol/bcc_libp2p/node.py +++ b/trinity/protocol/bcc_libp2p/node.py @@ -1,4 +1,5 @@ import asyncio +import contextlib from dataclasses import dataclass import logging import operator @@ -152,7 +153,6 @@ get_requested_beacon_blocks, get_beacon_blocks_by_root, ) -from async_generator import asynccontextmanager from trinity.metrics.events import ( Libp2pPeersRequest, @@ -547,7 +547,7 @@ def _register_rpc_handlers(self) -> None: async def new_stream(self, peer_id: ID, protocol: TProtocol) -> INetStream: return await self.host.new_stream(peer_id, [protocol]) - @asynccontextmanager + @contextlib.asynccontextmanager async def new_handshake_interaction(self, stream: INetStream) -> AsyncIterator[Interaction]: try: async with Interaction(stream) as interaction: @@ -567,7 +567,7 @@ async def new_handshake_interaction(self, stream: INetStream) -> AsyncIterator[I ) raise HandshakeFailure from error - @asynccontextmanager + @contextlib.asynccontextmanager async def post_handshake_handler_interaction( self, stream: INetStream @@ -585,7 +585,7 @@ async def post_handshake_handler_interaction( await stream.reset() return - @asynccontextmanager + @contextlib.asynccontextmanager async def my_request_interaction(self, stream: INetStream) -> AsyncIterator[Interaction]: try: async with Interaction(stream) as interaction: diff --git a/trinity/server.py b/trinity/server.py index 7e3095c4f3..2c14b23a47 100644 --- a/trinity/server.py +++ b/trinity/server.py @@ -1,5 +1,6 @@ from abc import abstractmethod import asyncio +import contextlib from typing import ( AsyncIterator, Generic, @@ -8,7 +9,6 @@ TypeVar, ) -from async_generator import asynccontextmanager from async_service import Service @@ -103,7 +103,7 @@ def __init__(self, def _make_peer_pool(self) -> TPeerPool: ... - @asynccontextmanager + @contextlib.asynccontextmanager async def tcp_listener(self) -> AsyncIterator[None]: # TODO: Support IPv6 addresses as well. tcp_listener = await asyncio.start_server( diff --git a/trinity/sync/common/headers.py b/trinity/sync/common/headers.py index bd4ed3707b..ffdfbee172 100644 --- a/trinity/sync/common/headers.py +++ b/trinity/sync/common/headers.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod import asyncio from concurrent.futures import CancelledError +import contextlib from operator import attrgetter, itemgetter from random import randrange from typing import ( @@ -16,9 +17,6 @@ Type, ) -from async_generator import ( - asynccontextmanager, -) from cancel_token import CancelToken, OperationCancelled from eth_typing import ( BlockIdentifier, @@ -924,7 +922,7 @@ async def _build_skeleton(self) -> None: async with self._get_skeleton_syncer(peer) as syncer: await self._full_skeleton_sync(syncer) - @asynccontextmanager + @contextlib.asynccontextmanager async def _get_skeleton_syncer( self, peer: TChainPeer) -> AsyncIterator[SkeletonSyncer[TChainPeer]]: if self._is_syncing_skeleton: diff --git a/trinity/tools/async_process_runner.py b/trinity/tools/async_process_runner.py index beddf80f03..c7eb83b9f8 100644 --- a/trinity/tools/async_process_runner.py +++ b/trinity/tools/async_process_runner.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import logging import os import signal @@ -10,7 +11,6 @@ Tuple, ) -from async_generator import asynccontextmanager from async_timeout import timeout @@ -21,7 +21,7 @@ class AsyncProcessRunner(): proc: asyncio.subprocess.Process @classmethod - @asynccontextmanager + @contextlib.asynccontextmanager async def run(cls, cmds: Tuple[str, ...], timeout_sec: int = 10) -> AsyncIterator['AsyncProcessRunner']: diff --git a/trinity/tools/bcc_factories.py b/trinity/tools/bcc_factories.py index 83a601f743..0a3a9e9fc6 100644 --- a/trinity/tools/bcc_factories.py +++ b/trinity/tools/bcc_factories.py @@ -1,9 +1,8 @@ from eth2.beacon.tools.factories import BeaconChainFactory import asyncio +import contextlib from typing import Any, AsyncIterator, Dict, Iterable, Collection, Tuple, Type, Sequence -from async_generator import asynccontextmanager - from cancel_token import CancelToken from lahja import EndpointAPI @@ -90,7 +89,7 @@ class Meta: head_slot = 0 -@asynccontextmanager +@contextlib.asynccontextmanager async def ConnectionPairFactory( alice_chaindb: AsyncBeaconChainDB = None, alice_branch: Collection[BaseSignedBeaconBlock] = None, diff --git a/trinity/tools/event_bus.py b/trinity/tools/event_bus.py index 6e81b9becd..c3bb839636 100644 --- a/trinity/tools/event_bus.py +++ b/trinity/tools/event_bus.py @@ -1,11 +1,11 @@ import asyncio +import contextlib from typing import AsyncIterator, Type -from async_generator import asynccontextmanager from lahja import BaseEvent, EndpointAPI -@asynccontextmanager +@contextlib.asynccontextmanager async def mock_request_response(request_type: Type[BaseEvent], response: BaseEvent, event_bus: EndpointAPI) -> AsyncIterator[None]: