diff --git a/tests/core/json-rpc/test_ipc.py b/tests/core/json-rpc/test_ipc.py index de4a161dfd..121d975ccb 100644 --- a/tests/core/json-rpc/test_ipc.py +++ b/tests/core/json-rpc/test_ipc.py @@ -678,7 +678,7 @@ async def test_admin_peers( def to_remote_address(session): return f"{session.remote.address.ip}:{session.remote.address.tcp_port}" - assert json_bob['caps'] == ['eth/63', 'eth/65'] + assert json_bob['caps'] == ['eth/63', 'eth/66'] assert json_bob['enode'] == alice.connection.session.remote.uri() assert json_bob['id'] == str(alice.connection.session.id) assert json_bob['name'] == 'bob' @@ -687,7 +687,7 @@ def to_remote_address(session): assert bob_network['localAddress'] == '0.0.0.0:30303' assert bob_network['remoteAddress'] == to_remote_address(alice.connection.session) - assert json_alice['caps'] == ['eth/63', 'eth/65'] + assert json_alice['caps'] == ['eth/63', 'eth/66'] assert json_alice['enode'] == bob.connection.session.remote.uri() assert json_alice['id'] == str(bob.connection.session.id) assert json_alice['name'] == 'alice' diff --git a/tests/core/p2p-proto/test_eth_api.py b/tests/core/p2p-proto/test_eth_api.py index 15e9cef430..0a218071be 100644 --- a/tests/core/p2p-proto/test_eth_api.py +++ b/tests/core/p2p-proto/test_eth_api.py @@ -15,9 +15,10 @@ from trinity._utils.assertions import assert_type_equality from trinity.db.eth1.header import AsyncHeaderDB from trinity.exceptions import WrongForkIDFailure -from trinity.protocol.eth.api import ETHV65API, ETHV63API, ETHV64API +from trinity.protocol.eth.api import ETHV65API, ETHV63API, ETHV64API, ETHV66API from trinity.protocol.eth.commands import ( GetBlockHeadersV65, + GetBlockHeadersV66, GetNodeDataV65, NewBlock, Status, @@ -28,6 +29,7 @@ ETHProtocolV63, ETHProtocolV64, ETHProtocolV65, + ETHProtocolV66, ) from trinity.tools.factories.common import ( @@ -45,6 +47,13 @@ ) +def get_highest_eth_protocol_version(client): + return max( + protocol.as_capability()[1] for protocol in client.connection.get_protocols() + if protocol.as_capability()[0] == 'eth' + ) + + @pytest.fixture def bob_chain(): chain = build( @@ -118,6 +127,8 @@ def protocol_specific_classes(alice): return ETHV64API, ETHHandshakeReceipt, Status, StatusPayloadFactory elif alice.connection.has_protocol(ETHProtocolV65): return ETHV65API, ETHHandshakeReceipt, Status, StatusPayloadFactory + elif alice.connection.has_protocol(ETHProtocolV66): + return ETHV66API, ETHHandshakeReceipt, Status, StatusPayloadFactory else: raise Exception("No ETH protocol found") @@ -225,6 +236,9 @@ async def _handle_cmd(connection, cmd): @pytest.mark.asyncio async def test_eth_api_send_get_node_data(alice, bob): + if get_highest_eth_protocol_version(alice) > ETHProtocolV65.version: + pytest.skip("Test not applicable above eth/65") + payload = tuple(BlockHashFactory.create_batch(5)) command_fut = asyncio.Future() @@ -242,6 +256,9 @@ async def _handle_cmd(connection, cmd): @pytest.mark.asyncio async def test_eth_api_send_get_block_headers(alice, bob): + if get_highest_eth_protocol_version(alice) > ETHProtocolV65.version: + pytest.skip("Test not applicable above eth/65") + payload = BlockHeadersQueryFactory() command_fut = asyncio.Future() @@ -262,6 +279,32 @@ async def _handle_cmd(connection, cmd): assert_type_equality(payload, result.payload) +@pytest.mark.asyncio +async def test_eth_api_send_get_block_headers_with_request_id(alice, bob): + if get_highest_eth_protocol_version(alice) < ETHProtocolV66.version: + pytest.skip("Test not applicable below eth/66") + + payload = BlockHeadersQueryFactory() + + command_fut = asyncio.Future() + + async def _handle_cmd(connection, cmd): + command_fut.set_result(cmd) + + bob.connection.add_command_handler(GetBlockHeadersV66, _handle_cmd) + request_id = alice.eth_api.send_get_block_headers( + block_number_or_hash=payload.block_number_or_hash, + max_headers=payload.block_number_or_hash, + skip=payload.skip, + reverse=payload.reverse, + ) + + result = await asyncio.wait_for(command_fut, timeout=1) + assert isinstance(result, GetBlockHeadersV66) + assert_type_equality(payload, result.payload.query) + assert result.payload.request_id == request_id + + @pytest.mark.asyncio async def test_handshake_with_incompatible_fork_id(alice_chain, bob_chain): diff --git a/tests/core/p2p-proto/test_peer.py b/tests/core/p2p-proto/test_peer.py index 20f132edf9..a9384aaff1 100644 --- a/tests/core/p2p-proto/test_peer.py +++ b/tests/core/p2p-proto/test_peer.py @@ -7,7 +7,7 @@ from trinity.protocol.eth.peer import ETHPeer from trinity.protocol.eth.proto import ( ETHProtocolV63, - ETHProtocolV65, + ETHProtocolV66, ) from trinity.protocol.les.peer import LESPeer from trinity.protocol.les.proto import ( @@ -63,8 +63,8 @@ async def test_ETH_peers(): assert isinstance(alice, ETHPeer) assert isinstance(bob, ETHPeer) - assert isinstance(alice.sub_proto, ETHProtocolV65) - assert isinstance(bob.sub_proto, ETHProtocolV65) + assert isinstance(alice.sub_proto, ETHProtocolV66) + assert isinstance(bob.sub_proto, ETHProtocolV66) @pytest.mark.asyncio diff --git a/tests/core/p2p-proto/test_peer_block_body_validator_api.py b/tests/core/p2p-proto/test_peer_block_body_validator_api.py index 7cdaa8fb0c..f529634fed 100644 --- a/tests/core/p2p-proto/test_peer_block_body_validator_api.py +++ b/tests/core/p2p-proto/test_peer_block_body_validator_api.py @@ -19,7 +19,7 @@ from trinity.rlp.block_body import BlockBody -from trinity.tools.factories import LatestETHPeerPairFactory +from trinity.tools.factories import LatestETHPeerPairFactory, ETHV65PeerPairFactory def mk_uncle(block_number): @@ -73,12 +73,24 @@ def mk_headers(*counts): yield mk_header_and_body(idx, num_transactions, num_uncles) -@pytest.fixture -async def eth_peer_and_remote(): - async with LatestETHPeerPairFactory() as (peer, remote): +@pytest.fixture( + params=( + ETHV65PeerPairFactory, + LatestETHPeerPairFactory, + ) +) +async def eth_peer_and_remote(request): + async with request.param() as (peer, remote): yield peer, remote +def get_request_id_or_none(request): + try: + return request.payload.request_id + except AttributeError: + return None + + @pytest.mark.asyncio async def test_eth_peer_get_block_bodies_round_trip_with_empty_response(eth_peer_and_remote): peer, remote = eth_peer_and_remote @@ -86,12 +98,11 @@ async def test_eth_peer_get_block_bodies_round_trip_with_empty_response(eth_peer headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0)) headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle) - async def send_block_bodies(): - remote.eth_api.send_block_bodies([]) - await asyncio.sleep(0) + async def send_block_bodies(_, cmd): + remote.eth_api.send_block_bodies([], get_request_id_or_none(cmd)) + remote.connection.add_msg_handler(send_block_bodies) get_bodies_task = asyncio.ensure_future(peer.eth_api.get_block_bodies(headers)) - asyncio.ensure_future(send_block_bodies()) response = await get_bodies_task @@ -105,15 +116,15 @@ async def test_eth_peer_get_block_bodies_round_trip_with_full_response(eth_peer_ headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0)) headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle) - async def send_block_bodies(): - remote.eth_api.send_block_bodies(bodies) - await asyncio.sleep(0) + async def send_block_bodies(_, cmd): + remote.eth_api.send_block_bodies(bodies, get_request_id_or_none(cmd)) + + remote.connection.add_msg_handler(send_block_bodies) transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts)) bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes)) get_bodies_task = asyncio.ensure_future(peer.eth_api.get_block_bodies(headers)) - asyncio.ensure_future(send_block_bodies()) response = await get_bodies_task @@ -128,15 +139,15 @@ async def test_eth_peer_get_block_bodies_round_trip_with_partial_response(eth_pe headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0)) headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle) - async def send_block_bodies(): - remote.eth_api.send_block_bodies(bodies[1:]) + async def send_block_bodies(_, cmd): + remote.eth_api.send_block_bodies(bodies[1:], get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_block_bodies) transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts)) bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes)) get_bodies_task = asyncio.ensure_future(peer.eth_api.get_block_bodies(headers)) - asyncio.ensure_future(send_block_bodies()) response = await get_bodies_task @@ -151,19 +162,19 @@ async def test_eth_peer_get_block_bodies_round_trip_with_noise(eth_peer_and_remo headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0)) headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle) - async def send_block_bodies(): - remote.eth_api.send_node_data((b'', b'arst')) + async def send_block_bodies(_, cmd): + remote.eth_api.send_node_data((b'', b'arst'), get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_block_bodies(bodies) + remote.eth_api.send_block_bodies(bodies, get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_node_data((b'', b'arst')) + remote.eth_api.send_node_data((b'', b'arst'), get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_block_bodies) transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts)) bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes)) get_bodies_task = asyncio.ensure_future(peer.eth_api.get_block_bodies(headers)) - asyncio.ensure_future(send_block_bodies()) response = await get_bodies_task @@ -181,17 +192,17 @@ async def test_eth_peer_get_block_bodies_round_trip_no_match_invalid_response(et wrong_headers_bundle = mk_headers((4, 1), (3, 5), (2, 0), (7, 3)) _, wrong_bodies, _, _, _ = zip(*wrong_headers_bundle) - async def send_block_bodies(): - remote.eth_api.send_block_bodies(wrong_bodies) + async def send_block_bodies(_, cmd): + remote.eth_api.send_block_bodies(wrong_bodies, get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_block_bodies(bodies) + remote.eth_api.send_block_bodies(bodies, get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_block_bodies) transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts)) bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes)) get_bodies_task = asyncio.ensure_future(peer.eth_api.get_block_bodies(headers)) - asyncio.ensure_future(send_block_bodies()) response = await get_bodies_task diff --git a/tests/core/p2p-proto/test_peer_block_header_validator_api.py b/tests/core/p2p-proto/test_peer_block_header_validator_api.py index cbcf65155f..91f4d1d083 100644 --- a/tests/core/p2p-proto/test_peer_block_header_validator_api.py +++ b/tests/core/p2p-proto/test_peer_block_header_validator_api.py @@ -25,6 +25,13 @@ async def next_request_id(self): return msg.command.payload.request_id +def get_request_id_or_none(request): + try: + return request.payload.request_id + except AttributeError: + return None + + @to_tuple def mk_header_chain(length): assert length >= 1 @@ -72,11 +79,11 @@ async def test_eth_peer_get_headers_round_trip(eth_peer_and_remote, headers): peer, remote = eth_peer_and_remote - async def send_headers(): - remote.eth_api.send_block_headers(headers) + async def send_headers(_, cmd): + remote.eth_api.send_block_headers(headers, get_request_id_or_none(cmd)) + remote.connection.add_msg_handler(send_headers) get_headers_task = asyncio.ensure_future(peer.chain_api.get_block_headers(*params)) - asyncio.ensure_future(send_headers()) response = await get_headers_task @@ -90,13 +97,11 @@ async def test_eth_peer_get_headers_round_trip_concurrent_requests(eth_peer_and_ peer, remote = eth_peer_and_remote headers = mk_header_chain(1) - async def send_headers(): - await asyncio.sleep(0.01) - remote.eth_api.send_block_headers(headers) + async def send_headers(_, cmd): await asyncio.sleep(0.01) - remote.eth_api.send_block_headers(headers) - await asyncio.sleep(0.01) - remote.eth_api.send_block_headers(headers) + remote.eth_api.send_block_headers(headers, get_request_id_or_none(cmd)) + + remote.connection.add_msg_handler(send_headers) params = (0, 1, 0, False) @@ -105,7 +110,7 @@ async def send_headers(): asyncio.ensure_future(peer.chain_api.get_block_headers(*params)), asyncio.ensure_future(peer.chain_api.get_block_headers(*params)), ] - asyncio.ensure_future(send_headers()) + results = await asyncio.gather(*tasks) for response in results: @@ -148,14 +153,14 @@ async def test_eth_peer_get_headers_round_trip_with_noise(eth_peer_and_remote): headers = mk_header_chain(10) - async def send_responses(): - remote.eth_api.send_node_data([b'arst', b'tsra']) + async def send_responses(_, cmd): + remote.eth_api.send_node_data([b'arst', b'tsra'], get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_block_headers(headers) + remote.eth_api.send_block_headers(headers, get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_responses) get_headers_task = asyncio.ensure_future(peer.chain_api.get_block_headers(0, 10, 0, False)) - asyncio.ensure_future(send_responses()) response = await get_headers_task @@ -172,16 +177,16 @@ async def test_eth_peer_get_headers_round_trip_does_not_match_invalid_response(e wrong_headers = mk_header_chain(10)[3:8] - async def send_responses(): - remote.eth_api.send_node_data([b'arst', b'tsra']) + async def send_responses(_, cmd): + remote.eth_api.send_node_data([b'arst', b'tsra'], get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_block_headers(wrong_headers) + remote.eth_api.send_block_headers(wrong_headers, get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_block_headers(headers) + remote.eth_api.send_block_headers(headers, get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_responses) get_headers_task = asyncio.ensure_future(peer.chain_api.get_block_headers(0, 5, 0, False)) - asyncio.ensure_future(send_responses()) response = await get_headers_task diff --git a/tests/core/p2p-proto/test_peer_node_data_validator_api.py b/tests/core/p2p-proto/test_peer_node_data_validator_api.py index c3cd10a783..cebbd0bdf7 100644 --- a/tests/core/p2p-proto/test_peer_node_data_validator_api.py +++ b/tests/core/p2p-proto/test_peer_node_data_validator_api.py @@ -8,15 +8,27 @@ keccak, ) -from trinity.tools.factories import LatestETHPeerPairFactory +from trinity.tools.factories import LatestETHPeerPairFactory, ETHV65PeerPairFactory -@pytest.fixture -async def eth_peer_and_remote(): - async with LatestETHPeerPairFactory() as (peer, remote): +@pytest.fixture( + params=( + ETHV65PeerPairFactory, + LatestETHPeerPairFactory, + ) +) +async def eth_peer_and_remote(request): + async with request.param() as (peer, remote): yield peer, remote +def get_request_id_or_none(request): + try: + return request.payload.request_id + except AttributeError: + return None + + def mk_node(): node_length = random.randint(0, 2048) node = os.urandom(node_length) @@ -50,13 +62,13 @@ async def test_eth_peer_get_node_data_round_trip(eth_peer_and_remote, node_keys, peer, remote = eth_peer_and_remote node_data = tuple(zip(node_keys, nodes)) - async def send_node_data(): - remote.eth_api.send_node_data(nodes) + async def send_node_data(_, cmd): + remote.eth_api.send_node_data(nodes, get_request_id_or_none(cmd)) + + remote.connection.add_msg_handler(send_node_data) request = asyncio.ensure_future(peer.eth_api.get_node_data(node_keys)) - asyncio.ensure_future(send_node_data()) response = await request - assert len(response) == len(node_keys) assert response == node_data @@ -68,13 +80,13 @@ async def test_eth_peer_get_headers_round_trip_partial_response(eth_peer_and_rem node_keys, nodes = mk_node_data(32) node_data = tuple(zip(node_keys, nodes)) - async def send_responses(): + async def send_responses(_, cmd): remote.eth_api.send_transactions([]) await asyncio.sleep(0) - remote.eth_api.send_node_data(nodes[:10]) + remote.eth_api.send_node_data(nodes[:10], get_request_id_or_none(cmd)) await asyncio.sleep(0) - asyncio.ensure_future(send_responses()) + remote.connection.add_msg_handler(send_responses) response = await peer.eth_api.get_node_data(node_keys) assert len(response) == 10 @@ -88,13 +100,13 @@ async def test_eth_peer_get_headers_round_trip_with_noise(eth_peer_and_remote): node_keys, nodes = mk_node_data(32) node_data = tuple(zip(node_keys, nodes)) - async def send_responses(): + async def send_responses(_, cmd): remote.eth_api.send_transactions([]) await asyncio.sleep(0) - remote.eth_api.send_node_data(nodes) + remote.eth_api.send_node_data(nodes, get_request_id_or_none(cmd)) await asyncio.sleep(0) - asyncio.ensure_future(send_responses()) + remote.connection.add_msg_handler(send_responses) response = await peer.eth_api.get_node_data(node_keys) assert len(response) == len(nodes) @@ -110,15 +122,15 @@ async def test_eth_peer_get_headers_round_trip_does_not_match_invalid_response(e wrong_nodes = tuple(set(mk_node() for _ in range(32)).difference(nodes)) - async def send_responses(): - remote.eth_api.send_node_data(wrong_nodes) + async def send_responses(_, cmd): + remote.eth_api.send_node_data(wrong_nodes, get_request_id_or_none(cmd)) await asyncio.sleep(0) remote.eth_api.send_transactions([]) await asyncio.sleep(0) - remote.eth_api.send_node_data(nodes) + remote.eth_api.send_node_data(nodes, get_request_id_or_none(cmd)) await asyncio.sleep(0) - asyncio.ensure_future(send_responses()) + remote.connection.add_msg_handler(send_responses) response = await peer.eth_api.get_node_data(node_keys) assert len(response) == len(nodes) diff --git a/tests/core/p2p-proto/test_peer_receipts_validator_api.py b/tests/core/p2p-proto/test_peer_receipts_validator_api.py index e49e1ef3be..c899fc54b7 100644 --- a/tests/core/p2p-proto/test_peer_receipts_validator_api.py +++ b/tests/core/p2p-proto/test_peer_receipts_validator_api.py @@ -10,15 +10,27 @@ from eth.rlp.headers import BlockHeader from eth.rlp.receipts import Receipt -from trinity.tools.factories import LatestETHPeerPairFactory +from trinity.tools.factories import LatestETHPeerPairFactory, ETHV65PeerPairFactory -@pytest.fixture -async def eth_peer_and_remote(): - async with LatestETHPeerPairFactory() as (peer, remote): +@pytest.fixture( + params=( + ETHV65PeerPairFactory, + LatestETHPeerPairFactory, + ) +) +async def eth_peer_and_remote(request): + async with request.param() as (peer, remote): yield peer, remote +def get_request_id_or_none(request): + try: + return request.payload.request_id + except AttributeError: + return None + + @to_tuple def mk_receipts(num_receipts): for _ in range(num_receipts): @@ -57,12 +69,11 @@ async def test_eth_peer_get_receipts_round_trip_with_full_response(eth_peer_and_ headers, receipts, trie_roots_and_data = zip(*headers_bundle) receipts_bundle = tuple(zip(receipts, trie_roots_and_data)) - async def send_receipts(): - remote.eth_api.send_receipts(receipts) - await asyncio.sleep(0) + async def send_receipts(_, cmd): + remote.eth_api.send_receipts(receipts, get_request_id_or_none(cmd)) + remote.connection.add_msg_handler(send_receipts) get_receipts_task = asyncio.ensure_future(peer.eth_api.get_receipts(headers)) - asyncio.ensure_future(send_receipts()) response = await get_receipts_task @@ -78,12 +89,13 @@ async def test_eth_peer_get_receipts_round_trip_with_partial_response(eth_peer_a headers, receipts, trie_roots_and_data = zip(*headers_bundle) receipts_bundle = tuple(zip(receipts, trie_roots_and_data)) - async def send_receipts(): - remote.eth_api.send_receipts((receipts[2], receipts[1], receipts[4])) - await asyncio.sleep(0) + async def send_receipts(_, cmd): + remote.eth_api.send_receipts( + (receipts[2], receipts[1], receipts[4]), get_request_id_or_none(cmd) + ) + remote.connection.add_msg_handler(send_receipts) get_receipts_task = asyncio.ensure_future(peer.eth_api.get_receipts(headers)) - asyncio.ensure_future(send_receipts()) response = await get_receipts_task @@ -99,16 +111,16 @@ async def test_eth_peer_get_receipts_round_trip_with_noise(eth_peer_and_remote): headers, receipts, trie_roots_and_data = zip(*headers_bundle) receipts_bundle = tuple(zip(receipts, trie_roots_and_data)) - async def send_receipts(): + async def send_receipts(_, cmd): remote.eth_api.send_transactions([]) await asyncio.sleep(0) - remote.eth_api.send_receipts(receipts) + remote.eth_api.send_receipts(receipts, get_request_id_or_none(cmd)) await asyncio.sleep(0) remote.eth_api.send_transactions([]) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_receipts) get_receipts_task = asyncio.ensure_future(peer.eth_api.get_receipts(headers)) - asyncio.ensure_future(send_receipts()) response = await get_receipts_task @@ -127,14 +139,14 @@ async def test_eth_peer_get_receipts_round_trip_no_match_invalid_response(eth_pe wrong_headers = mk_headers(4, 3, 8) _, wrong_receipts, _ = zip(*wrong_headers) - async def send_receipts(): - remote.eth_api.send_receipts(wrong_receipts) + async def send_receipts(_, cmd): + remote.eth_api.send_receipts(wrong_receipts, get_request_id_or_none(cmd)) await asyncio.sleep(0) - remote.eth_api.send_receipts(receipts) + remote.eth_api.send_receipts(receipts, get_request_id_or_none(cmd)) await asyncio.sleep(0) + remote.connection.add_msg_handler(send_receipts) get_receipts_task = asyncio.ensure_future(peer.eth_api.get_receipts(headers)) - asyncio.ensure_future(send_receipts()) response = await get_receipts_task diff --git a/tests/core/p2p-proto/test_stats.py b/tests/core/p2p-proto/test_stats.py index 3a9679216a..487bf78710 100644 --- a/tests/core/p2p-proto/test_stats.py +++ b/tests/core/p2p-proto/test_stats.py @@ -52,14 +52,14 @@ async def test_eth_get_headers_empty_stats(): @pytest.mark.asyncio async def test_eth_get_headers_stats(): async with LatestETHPeerPairFactory() as (peer, remote): - async def send_headers(): - remote.eth_api.send_block_headers(mk_header_chain(1)) + async def send_headers(_, cmd): + remote.eth_api.send_block_headers(mk_header_chain(1), cmd.payload.request_id) + remote.connection.add_msg_handler(send_headers) for idx in range(1, 5): get_headers_task = asyncio.ensure_future( peer.eth_api.get_block_headers(0, 1, 0, False) ) - asyncio.ensure_future(send_headers()) await get_headers_task diff --git a/trinity/_utils/les.py b/trinity/_utils/requests.py similarity index 100% rename from trinity/_utils/les.py rename to trinity/_utils/requests.py diff --git a/trinity/components/builtin/tx_pool/pool.py b/trinity/components/builtin/tx_pool/pool.py index a5c73b9cd7..6bb2edc225 100644 --- a/trinity/components/builtin/tx_pool/pool.py +++ b/trinity/components/builtin/tx_pool/pool.py @@ -18,6 +18,7 @@ from trinity._utils.bloom import RollingBloom from trinity._utils.logging import get_logger +from trinity.protocol.eth.commands import GetPooledTransactionsV66 from trinity.protocol.eth.events import ( TransactionsEvent, GetPooledTransactionsEvent, @@ -113,7 +114,13 @@ async def _process_get_pooled_transactions_requests(self) -> None: async for event in self._event_bus.stream(GetPooledTransactionsEvent): asking_peer = await self._peer_pool.ensure_proxy_peer(event.session) - asking_peer.eth_api.send_pooled_transactions([]) + + if isinstance(event.command, GetPooledTransactionsV66): + request_id = event.command.payload.request_id + else: + request_id = None + + asking_peer.eth_api.send_pooled_transactions([], request_id) async def _handle_tx(self, sender: SessionAPI, txs: Sequence[SignedTransactionAPI]) -> None: diff --git a/trinity/protocol/common/api.py b/trinity/protocol/common/api.py index 407e9f9fe3..63b2bb3817 100644 --- a/trinity/protocol/common/api.py +++ b/trinity/protocol/common/api.py @@ -7,23 +7,32 @@ from p2p.logic import Application from p2p.qualifiers import HasProtocol -from trinity.protocol.eth.api import ETHV63API, ETHV65API, ETHV64API -from trinity.protocol.eth.proto import ETHProtocolV63, ETHProtocolV64, ETHProtocolV65 +from trinity.protocol.eth.api import ETHV63API, ETHV64API, ETHV65API, ETHV66API +from trinity.protocol.eth.proto import ( + ETHProtocolV63, + ETHProtocolV64, + ETHProtocolV65, + ETHProtocolV66, +) + from trinity.protocol.les.api import LESV1API, LESV2API from trinity.protocol.les.proto import LESProtocolV1, LESProtocolV2 from .abc import ChainInfoAPI, HeadInfoAPI -AnyETHLES = HasProtocol(ETHProtocolV65) | HasProtocol(ETHProtocolV64) | HasProtocol( - ETHProtocolV63) | HasProtocol(LESProtocolV2) | HasProtocol(LESProtocolV1) +AnyETHLES = HasProtocol(ETHProtocolV66) | HasProtocol(ETHProtocolV65) | HasProtocol( + ETHProtocolV64) | HasProtocol(ETHProtocolV63) | HasProtocol( + LESProtocolV2) | HasProtocol(LESProtocolV1) -AnyETHLESAPI = Union[ETHV65API, ETHV64API, ETHV63API, LESV1API, LESV2API] +AnyETHLESAPI = Union[ETHV66API, ETHV65API, ETHV64API, ETHV63API, LESV1API, LESV2API] def choose_eth_or_les_api( connection: ConnectionAPI) -> AnyETHLESAPI: - if connection.has_protocol(ETHProtocolV65): + if connection.has_protocol(ETHProtocolV66): + return connection.get_logic(ETHV66API.name, ETHV66API) + elif connection.has_protocol(ETHProtocolV65): return connection.get_logic(ETHV65API.name, ETHV65API) elif connection.has_protocol(ETHProtocolV64): return connection.get_logic(ETHV64API.name, ETHV64API) diff --git a/trinity/protocol/common/payloads.py b/trinity/protocol/common/payloads.py index 115f11e6a7..01da00ed64 100644 --- a/trinity/protocol/common/payloads.py +++ b/trinity/protocol/common/payloads.py @@ -1,10 +1,112 @@ -from typing import NamedTuple, Union +from typing import NamedTuple, Union, Sequence, Generic, Iterator, overload, TypeVar, Tuple +from eth.abc import BlockHeaderAPI, BlockAPI, SignedTransactionAPI, ReceiptAPI from eth_typing import BlockNumber, Hash32 +from p2p.commands import BaseCommand + +TData = TypeVar('TData') + + +# mypy doesn't support generic named tuples yet: https://github.com/python/mypy/issues/685 +class Payload(Sequence[Union[int, TData]], Generic[TData]): + + def __init__(self, request_id: int, data: TData): + self.request_id = request_id + self.data = data + + def __iter__(self) -> Iterator[Union[int, TData]]: + yield self.request_id + yield self.data + + @overload + def __getitem__(self, idx: int) -> Union[int, TData]: + ... + + @overload # noqa: F811 + def __getitem__(self, s: slice) -> Sequence[Union[int, TData]]: + ... + + def __getitem__( # noqa: F811 + self, index: Union[int, slice] + ) -> Union[Union[int, TData], Sequence[Union[int, TData]]]: + if isinstance(index, slice): + raise Exception("Subclass disallows slicing") + + if index == 0: + return self.request_id + elif index == 1: + return self.data + else: + raise IndexError(f"Index {index} is out of bounds. Can only be 0 or 1") + + def __len__(self) -> int: + return 2 + + +TQuery = TypeVar('TQuery') + + +class QueryPayload(Payload[TQuery]): + def __init__(self, request_id: int, query: TQuery): + self.request_id = request_id + self.data = query + + @property + def query(self) -> TQuery: + return self.data + + +TResult = TypeVar('TResult') + + +class ResultPayload(Payload[TResult]): + def __init__(self, request_id: int, result: TResult): + self.request_id = request_id + self.data = result + + @property + def result(self) -> TResult: + return self.data + + +TPayload = TypeVar('TPayload') + + +def get_cmd_payload( + cmd: Union[ + BaseCommand[TPayload], + BaseCommand[QueryPayload[TQuery]], + BaseCommand[ResultPayload[TResult]], + ] +) -> Union[TPayload, TQuery, TResult]: + """ + Return the actual payload from a given command, meaning, for a ``BaseCommand[TPayload]`` return + ``TPayload``, for a ``BaseCommand[QueryPayload[TQuery]]`` return ``TQuery`` and for a + ``BaseCommand[ResultPayload[TResult]]`` return ``TResult``. This helper preserves type + information. + """ + if isinstance(cmd.payload, QueryPayload): + return cmd.payload.query + if isinstance(cmd.payload, ResultPayload): + return cmd.payload.result + else: + return cmd.payload + class BlockHeadersQuery(NamedTuple): block_number_or_hash: Union[BlockNumber, Hash32] max_headers: int skip: int reverse: bool + + +BlockHeadersQueryPayload = QueryPayload[BlockHeadersQuery] +BytesTupleQueryPayload = QueryPayload[Tuple[bytes, ...]] +Hash32TupleQueryPayload = QueryPayload[Tuple[Hash32, ...]] + +BytesTupleResultPayload = ResultPayload[Tuple[bytes, ...]] +BlockHeadersResultPayload = ResultPayload[Tuple[BlockHeaderAPI, ...]] +BlocksResultPayload = ResultPayload[Tuple[BlockAPI, ...]] +TransactionsResultPayload = ResultPayload[Tuple[SignedTransactionAPI, ...]] +ReceiptBundleResultPayload = ResultPayload[Tuple[Tuple[ReceiptAPI, ...], ...]] diff --git a/trinity/protocol/common/validators.py b/trinity/protocol/common/validators.py index 7be593999c..660e09b01d 100644 --- a/trinity/protocol/common/validators.py +++ b/trinity/protocol/common/validators.py @@ -1,6 +1,7 @@ from abc import abstractmethod import collections from typing import ( + Any, Sequence, Tuple, cast, @@ -19,6 +20,7 @@ ) from p2p.exchange import ValidatorAPI +from p2p.exchange.typing import TResponseCommand from trinity._utils.headers import sequence_builder from trinity._utils.humanize import humanize_integer_sequence @@ -155,3 +157,8 @@ def _validate_sequence(self, block_numbers: Sequence[BlockNumber]) -> None: 'Duplicate headers returned.\n' f'- duplicates: {humanize_integer_sequence(sorted(duplicates))}\n' ) + + +def match_payload_request_id(request_payload: Any, response: TResponseCommand) -> None: + if request_payload.request_id != response.payload.request_id: + raise ValidationError("Request `id` does not match") diff --git a/trinity/protocol/eth/api.py b/trinity/protocol/eth/api.py index 299e58870f..ebe7d54404 100644 --- a/trinity/protocol/eth/api.py +++ b/trinity/protocol/eth/api.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from typing import Any, Sequence, Tuple, Union, Generic, Type, TypeVar +from typing import Any, Sequence, Tuple, Union, Generic, Type, TypeVar, Optional from cached_property import cached_property @@ -17,8 +17,17 @@ from p2p.logic import Application, CommandHandler from p2p.qualifiers import HasProtocol +from trinity._utils.requests import gen_request_id from trinity.protocol.common.abc import HeadInfoAPI -from trinity.protocol.common.payloads import BlockHeadersQuery +from trinity.protocol.common.payloads import ( + BlockHeadersQuery, + BlockHeadersQueryPayload, + BlockHeadersResultPayload, + BlocksResultPayload, + BytesTupleResultPayload, + Hash32TupleQueryPayload, + ReceiptBundleResultPayload, +) from trinity.protocol.eth.commands import ( BlockBodiesV65, BlockHeadersV65, @@ -26,6 +35,7 @@ GetBlockHeadersV65, GetNodeDataV65, GetReceiptsV65, + GetBlockHeadersV66, NewBlock, NewBlockHashes, NodeDataV65, @@ -34,15 +44,21 @@ Transactions, Status, GetPooledTransactionsV65, -) + GetBlockBodiesV66, GetNodeDataV66, GetReceiptsV66, GetPooledTransactionsV66, NodeDataV66, + BlockHeadersV66, BlockBodiesV66, ReceiptsV66) from trinity.rlp.block_body import BlockBody from .exchanges import ( GetBlockBodiesV65Exchange, + GetBlockBodiesV66Exchange, GetBlockHeadersV65Exchange, + GetBlockHeadersV66Exchange, GetNodeDataV65Exchange, + GetNodeDataV66Exchange, GetReceiptsV65Exchange, + GetReceiptsV66Exchange, GetPooledTransactionsV65Exchange, + GetPooledTransactionsV66Exchange, ) from .handshaker import ETHV63HandshakeReceipt, ETHHandshakeReceipt, BaseETHHandshakeReceipt from .payloads import ( @@ -52,7 +68,7 @@ StatusV63Payload, StatusPayload, ) -from .proto import ETHProtocolV63, ETHProtocolV65, ETHProtocolV64 +from .proto import ETHProtocolV63, ETHProtocolV64, ETHProtocolV65, ETHProtocolV66 THandshakeReceipt = TypeVar("THandshakeReceipt", bound=BaseETHHandshakeReceipt[Any]) @@ -116,11 +132,13 @@ class BaseETHAPI(Application): name = 'eth' head_info_tracker_cls = BaseHeadInfoTracker[THandshakeReceipt] - get_block_bodies: GetBlockBodiesV65Exchange - get_block_headers: GetBlockHeadersV65Exchange - get_node_data: GetNodeDataV65Exchange - get_receipts: GetReceiptsV65Exchange - get_pooled_transactions: GetPooledTransactionsV65Exchange + get_block_bodies: Union[GetBlockBodiesV65Exchange, GetBlockBodiesV66Exchange] + get_block_headers: Union[GetBlockHeadersV65Exchange, GetBlockHeadersV66Exchange] + get_node_data: Union[GetNodeDataV65Exchange, GetNodeDataV66Exchange] + get_receipts: Union[GetReceiptsV65Exchange, GetReceiptsV66Exchange] + get_pooled_transactions: Union[ + GetPooledTransactionsV65Exchange, GetPooledTransactionsV66Exchange + ] def __init__(self) -> None: self.head_info = self.head_info_tracker_cls() @@ -170,10 +188,13 @@ def network_id(self) -> int: def genesis_hash(self) -> Hash32: return self.receipt.genesis_hash - def send_get_node_data(self, node_hashes: Sequence[Hash32]) -> None: - self.protocol.send(GetNodeDataV65(tuple(node_hashes))) + def send_get_node_data(self, node_hashes: Sequence[Hash32]) -> Optional[int]: + return self.protocol.send(GetNodeDataV65(tuple(node_hashes))) + + def send_node_data(self, nodes: Sequence[bytes], request_id: int = None) -> None: + if request_id is not None: + raise ValueError("request_id not expected for eth/63 API") - def send_node_data(self, nodes: Sequence[bytes]) -> None: self.protocol.send(NodeDataV65(tuple(nodes))) def send_get_block_headers( @@ -181,32 +202,40 @@ def send_get_block_headers( block_number_or_hash: Union[BlockNumber, Hash32], max_headers: int, skip: int, - reverse: bool) -> None: + reverse: bool) -> Optional[int]: payload = BlockHeadersQuery( block_number_or_hash=block_number_or_hash, max_headers=max_headers, skip=skip, reverse=reverse ) - self.protocol.send(GetBlockHeadersV65(payload)) + return self.protocol.send(GetBlockHeadersV65(payload)) def send_block_headers(self, headers: Sequence[BlockHeaderAPI]) -> None: self.protocol.send(BlockHeadersV65(tuple(headers))) - def send_get_block_bodies(self, block_hashes: Sequence[Hash32]) -> None: - self.protocol.send(GetBlockBodiesV65(tuple(block_hashes))) + def send_get_block_bodies(self, block_hashes: Sequence[Hash32]) -> Optional[int]: + return self.protocol.send(GetBlockBodiesV65(tuple(block_hashes))) + + def send_block_bodies(self, blocks: Sequence[BlockAPI], request_id: int = None) -> None: + if request_id is not None: + raise ValueError("request_id not expected for eth/63 API") - def send_block_bodies(self, blocks: Sequence[BlockAPI]) -> None: block_bodies = tuple( BlockBody(block.transactions, block.uncles) for block in blocks ) self.protocol.send(BlockBodiesV65(block_bodies)) - def send_get_receipts(self, block_hashes: Sequence[Hash32]) -> None: - self.protocol.send(GetReceiptsV65(tuple(block_hashes))) + def send_get_receipts(self, block_hashes: Sequence[Hash32]) -> Optional[int]: + return self.protocol.send(GetReceiptsV65(tuple(block_hashes))) + + def send_receipts(self, + receipts: Sequence[Sequence[ReceiptAPI]], + request_id: Optional[int] = None) -> None: + if request_id is not None: + raise ValueError("request_id not expected for eth/63 API") - def send_receipts(self, receipts: Sequence[Sequence[ReceiptAPI]]) -> None: self.protocol.send(ReceiptsV65(tuple(map(tuple, receipts)))) def send_transactions(self, transactions: Sequence[SignedTransactionAPI]) -> None: @@ -269,8 +298,113 @@ def __init__(self) -> None: self.get_pooled_transactions = GetPooledTransactionsV65Exchange() self.add_child_behavior(ExchangeLogic(self.get_pooled_transactions).as_behavior()) - def send_get_pooled_transactions(self, transaction_hashes: Sequence[Hash32]) -> None: - self.protocol.send(GetPooledTransactionsV65(tuple(transaction_hashes))) + def send_get_pooled_transactions(self, transaction_hashes: Sequence[Hash32]) -> Optional[int]: + return self.protocol.send(GetPooledTransactionsV65(tuple(transaction_hashes))) + + +class ETHV66API(ETHV65API): + qualifier = HasProtocol(ETHProtocolV66) + + @cached_property + def protocol(self) -> ProtocolAPI: + return self.connection.get_protocol_by_type(ETHProtocolV66) + + def __init__(self) -> None: + super().__init__() + self.get_block_headers = GetBlockHeadersV66Exchange() + self.get_block_bodies = GetBlockBodiesV66Exchange() + self.get_node_data = GetNodeDataV66Exchange() + self.get_receipts = GetReceiptsV66Exchange() + self.get_pooled_transactions = GetPooledTransactionsV66Exchange() + self.add_child_behavior(ExchangeLogic(self.get_block_headers).as_behavior()) + self.add_child_behavior(ExchangeLogic(self.get_block_bodies).as_behavior()) + self.add_child_behavior(ExchangeLogic(self.get_node_data).as_behavior()) + self.add_child_behavior(ExchangeLogic(self.get_receipts).as_behavior()) + self.add_child_behavior(ExchangeLogic(self.get_pooled_transactions).as_behavior()) + + def send_get_block_headers(self, + block_number_or_hash: Union[BlockNumber, Hash32], + max_headers: int, + skip: int, + reverse: bool) -> int: + query = BlockHeadersQuery( + block_number_or_hash=block_number_or_hash, + max_headers=max_headers, + skip=skip, + reverse=reverse, + ) + payload = BlockHeadersQueryPayload( + request_id=gen_request_id(), + query=query, + ) + self.protocol.send(GetBlockHeadersV66(payload)) + return payload.request_id + + def send_get_block_bodies(self, block_hashes: Sequence[Hash32]) -> Optional[int]: + payload = Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(block_hashes) + ) + self.protocol.send(GetBlockBodiesV66(payload)) + return payload.request_id + + def send_get_node_data(self, node_hashes: Sequence[Hash32]) -> Optional[int]: + payload = Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(node_hashes) + ) + self.protocol.send(GetNodeDataV66(payload)) + return payload.request_id + + def send_get_receipts(self, block_hashes: Sequence[Hash32]) -> Optional[int]: + payload = Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(block_hashes) + ) + self.protocol.send(GetReceiptsV66(payload)) + return payload.request_id + + def send_get_pooled_transactions(self, transaction_hashes: Sequence[Hash32]) -> Optional[int]: + payload = Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(transaction_hashes) + ) + self.protocol.send(GetPooledTransactionsV66(payload)) + return payload.request_id + + def send_node_data(self, nodes: Sequence[bytes], request_id: int = None) -> None: + payload = BytesTupleResultPayload( + request_id=request_id, + result=tuple(nodes) + ) + self.protocol.send(NodeDataV66(payload)) + + def send_block_headers(self, headers: Sequence[BlockHeaderAPI], request_id: int = None) -> None: + payload = BlockHeadersResultPayload( + request_id=request_id, + result=tuple(headers) + ) + self.protocol.send(BlockHeadersV66(payload)) + + def send_block_bodies(self, blocks: Sequence[BlockAPI], request_id: int = None) -> None: + block_bodies = tuple( + BlockBody(block.transactions, block.uncles) + for block in blocks + ) + payload = BlocksResultPayload( + request_id=request_id, + result=block_bodies + ) + self.protocol.send(BlockBodiesV66(payload)) + + def send_receipts(self, + receipts: Sequence[Sequence[ReceiptAPI]], + request_id: int = None) -> None: + payload = ReceiptBundleResultPayload( + request_id=request_id, + result=tuple(map(tuple, receipts)) + ) + self.protocol.send(ReceiptsV66(payload)) -AnyETHAPI = Union[ETHV63API, ETHV64API, ETHV65API] +AnyETHAPI = Union[ETHV63API, ETHV64API, ETHV65API, ETHV66API] diff --git a/trinity/protocol/eth/commands.py b/trinity/protocol/eth/commands.py index 21efecc5b6..65a8690969 100644 --- a/trinity/protocol/eth/commands.py +++ b/trinity/protocol/eth/commands.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Tuple, Union, Any from eth_typing import Hash32 from eth_utils.curried import ( @@ -15,20 +15,36 @@ from p2p.commands import BaseCommand, RLPCodec -from trinity.protocol.common.payloads import BlockHeadersQuery +from trinity.protocol.common.payloads import ( + BlockHeadersQuery, + BlockHeadersQueryPayload, + BlockHeadersResultPayload, + BlocksResultPayload, + BytesTupleQueryPayload, + BytesTupleResultPayload, + Hash32TupleQueryPayload, + ReceiptBundleResultPayload, + TransactionsResultPayload, +) from trinity.rlp.block_body import BlockBody from trinity.rlp.sedes import HashOrNumber, hash_sedes from .forkid import ForkID from .payloads import ( - StatusV63Payload, - NewBlockHash, BlockFields, + NewBlockHash, NewBlockPayload, StatusPayload, + StatusV63Payload, ) +# Using a NewType such as Hash32 will throw of pickle. That's why we use `bytes` and ignore +# the mypy warning where this helper is used. That way, we don't leak `bytes` into other APIs. +def process_tuple_bytes_query(args: Any) -> BytesTupleQueryPayload: + return BytesTupleQueryPayload(*args) + + class StatusV63(BaseCommand[StatusV63Payload]): protocol_command_id = 0 serialization_codec: RLPCodec[StatusV63Payload] = RLPCodec( @@ -90,6 +106,23 @@ class GetBlockHeadersV65(BaseCommand[BlockHeadersQuery]): ) +class GetBlockHeadersV66(BaseCommand[BlockHeadersQueryPayload]): + protocol_command_id = 3 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.List((HashOrNumber(), sedes.big_endian_int, sedes.big_endian_int, sedes.boolean)), + )), + process_inbound_payload_fn=compose( + lambda args: BlockHeadersQueryPayload(*args), + apply_formatter_at_index(lambda args: BlockHeadersQuery(*args), 1) + ), + ) + + +AnyGetBlockHeaders = Union[GetBlockHeadersV65, GetBlockHeadersV66] + + class BlockHeadersV65(BaseCommand[Tuple[BlockHeaderAPI, ...]]): protocol_command_id = 4 serialization_codec: RLPCodec[Tuple[BlockHeaderAPI, ...]] = RLPCodec( @@ -97,6 +130,20 @@ class BlockHeadersV65(BaseCommand[Tuple[BlockHeaderAPI, ...]]): ) +class BlockHeadersV66(BaseCommand[BlockHeadersResultPayload]): + protocol_command_id = 4 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(BlockHeader), + )), + process_inbound_payload_fn=lambda args: BlockHeadersResultPayload(*args), + ) + + +AnyBlockHeaders = Union[BlockHeadersV65, BlockHeadersV66] + + class GetBlockBodiesV65(BaseCommand[Tuple[Hash32, ...]]): protocol_command_id = 5 serialization_codec: RLPCodec[Tuple[Hash32, ...]] = RLPCodec( @@ -104,6 +151,21 @@ class GetBlockBodiesV65(BaseCommand[Tuple[Hash32, ...]]): ) +class GetBlockBodiesV66(BaseCommand[Hash32TupleQueryPayload]): + protocol_command_id = 5 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(hash_sedes), + )), + # processing as `bytes` because NewType`s such as Hash32 throw off pickle + process_inbound_payload_fn=process_tuple_bytes_query # type: ignore + ) + + +AnyGetBlockBodies = Union[GetBlockBodiesV65, GetBlockBodiesV66] + + class BlockBodiesV65(BaseCommand[Tuple[BlockBody, ...]]): protocol_command_id = 6 serialization_codec: RLPCodec[Tuple[BlockBody, ...]] = RLPCodec( @@ -111,6 +173,20 @@ class BlockBodiesV65(BaseCommand[Tuple[BlockBody, ...]]): ) +class BlockBodiesV66(BaseCommand[BlocksResultPayload]): + protocol_command_id = 6 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(BlockBody), + )), + process_inbound_payload_fn=lambda args: BlocksResultPayload(*args), + ) + + +AnyBlockBodies = Union[BlockBodiesV65, BlockBodiesV66] + + class NewBlock(BaseCommand[NewBlockPayload]): protocol_command_id = 7 serialization_codec: RLPCodec[NewBlockPayload] = RLPCodec( @@ -146,6 +222,21 @@ class GetPooledTransactionsV65(BaseCommand[Tuple[Hash32, ...]]): ) +class GetPooledTransactionsV66(BaseCommand[Hash32TupleQueryPayload]): + protocol_command_id = 9 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(hash_sedes), + )), + # processing as `bytes` because NewType`s such as Hash32 throw off pickle + process_inbound_payload_fn=process_tuple_bytes_query, # type: ignore + ) + + +AnyGetPooledTransactions = Union[GetPooledTransactionsV65, GetPooledTransactionsV66] + + class PooledTransactionsV65(BaseCommand[Tuple[SignedTransactionAPI, ...]]): protocol_command_id = 10 serialization_codec: RLPCodec[Tuple[SignedTransactionAPI, ...]] = RLPCodec( @@ -153,6 +244,20 @@ class PooledTransactionsV65(BaseCommand[Tuple[SignedTransactionAPI, ...]]): ) +class PooledTransactionsV66(BaseCommand[TransactionsResultPayload]): + protocol_command_id = 10 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(SignedTransactionAPI), + )), + process_inbound_payload_fn=lambda args: TransactionsResultPayload(*args), # noqa: E501 + ) + + +AnyPooledTransactions = Union[PooledTransactionsV65, PooledTransactionsV66] + + class GetNodeDataV65(BaseCommand[Tuple[Hash32, ...]]): protocol_command_id = 13 serialization_codec: RLPCodec[Tuple[Hash32, ...]] = RLPCodec( @@ -160,6 +265,21 @@ class GetNodeDataV65(BaseCommand[Tuple[Hash32, ...]]): ) +class GetNodeDataV66(BaseCommand[Hash32TupleQueryPayload]): + protocol_command_id = 13 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(hash_sedes), + )), + # processing as `bytes` because NewType`s such as Hash32 throw off pickle + process_inbound_payload_fn=process_tuple_bytes_query # type: ignore + ) + + +AnyGetNodeData = Union[GetNodeDataV65, GetNodeDataV66] + + class NodeDataV65(BaseCommand[Tuple[bytes, ...]]): protocol_command_id = 14 serialization_codec: RLPCodec[Tuple[bytes, ...]] = RLPCodec( @@ -167,6 +287,20 @@ class NodeDataV65(BaseCommand[Tuple[bytes, ...]]): ) +class NodeDataV66(BaseCommand[BytesTupleResultPayload]): + protocol_command_id = 14 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(sedes.binary), + )), + process_inbound_payload_fn=lambda args: BytesTupleResultPayload(*args), + ) + + +AnyNodeData = Union[NodeDataV65, NodeDataV66] + + class GetReceiptsV65(BaseCommand[Tuple[Hash32, ...]]): protocol_command_id = 15 serialization_codec: RLPCodec[Tuple[Hash32, ...]] = RLPCodec( @@ -174,8 +308,37 @@ class GetReceiptsV65(BaseCommand[Tuple[Hash32, ...]]): ) +class GetReceiptsV66(BaseCommand[Hash32TupleQueryPayload]): + protocol_command_id = 15 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(hash_sedes), + )), + # processing as `bytes` because NewType`s such as Hash32 throw off pickle + process_inbound_payload_fn=process_tuple_bytes_query, # type: ignore + ) + + +AnyGetReceipts = Union[GetReceiptsV65, GetReceiptsV66] + + class ReceiptsV65(BaseCommand[Tuple[Tuple[ReceiptAPI, ...], ...]]): protocol_command_id = 16 serialization_codec: RLPCodec[Tuple[Tuple[ReceiptAPI, ...], ...]] = RLPCodec( sedes=sedes.CountableList(sedes.CountableList(Receipt)), ) + + +class ReceiptsV66(BaseCommand[ReceiptBundleResultPayload]): + protocol_command_id = 16 + serialization_codec = RLPCodec( + sedes=sedes.List(( + sedes.big_endian_int, + sedes.CountableList(sedes.CountableList(Receipt)), + )), + process_inbound_payload_fn=lambda args: ReceiptBundleResultPayload(*args), + ) + + +AnyReceipts = Union[ReceiptsV65, ReceiptsV66] diff --git a/trinity/protocol/eth/events.py b/trinity/protocol/eth/events.py index ef5f8ee6f4..2d24a4a645 100644 --- a/trinity/protocol/eth/events.py +++ b/trinity/protocol/eth/events.py @@ -33,20 +33,20 @@ ) from .commands import ( - BlockBodiesV65, - BlockHeadersV65, - GetBlockBodiesV65, - GetBlockHeadersV65, - GetNodeDataV65, - GetReceiptsV65, + AnyGetBlockBodies, + AnyGetBlockHeaders, + AnyGetNodeData, + AnyGetPooledTransactions, + AnyGetReceipts, + AnyBlockHeaders, + AnyBlockBodies, + AnyPooledTransactions, + AnyReceipts, + AnyNodeData, NewBlock, NewBlockHashes, - NodeDataV65, - ReceiptsV65, Transactions, NewPooledTransactionHashes, - GetPooledTransactionsV65, - PooledTransactionsV65, ) @@ -57,7 +57,7 @@ class GetBlockHeadersEvent(PeerPoolMessageEvent): Event to carry a ``GetBlockHeaders`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: GetBlockHeadersV65 + command: AnyGetBlockHeaders class GetBlockBodiesEvent(PeerPoolMessageEvent): @@ -65,7 +65,7 @@ class GetBlockBodiesEvent(PeerPoolMessageEvent): Event to carry a ``GetBlockBodies`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: GetBlockBodiesV65 + command: AnyGetBlockBodies class GetReceiptsEvent(PeerPoolMessageEvent): @@ -73,7 +73,7 @@ class GetReceiptsEvent(PeerPoolMessageEvent): Event to carry a ``GetReceipts`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: GetReceiptsV65 + command: AnyGetReceipts class GetNodeDataEvent(PeerPoolMessageEvent): @@ -81,7 +81,7 @@ class GetNodeDataEvent(PeerPoolMessageEvent): Event to carry a ``GetNodeData`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: GetNodeDataV65 + command: AnyGetNodeData class TransactionsEvent(PeerPoolMessageEvent): @@ -121,7 +121,7 @@ class GetPooledTransactionsEvent(PeerPoolMessageEvent): Event to carry a ``GetPooledTransactions`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: GetPooledTransactionsV65 + command: AnyGetPooledTransactions class PooledTransactionsEvent(PeerPoolMessageEvent): @@ -129,7 +129,7 @@ class PooledTransactionsEvent(PeerPoolMessageEvent): Event to carry a ``PooledTransactions`` command from the peer pool to any process that subscribes the event through the event bus. """ - command: PooledTransactionsV65 + command: AnyPooledTransactions # Events flowing from Proxy to PeerPool @@ -142,7 +142,7 @@ class SendBlockHeadersEvent(PeerPoolMessageEvent): peer that sits in the peer pool. """ session: SessionAPI - command: BlockHeadersV65 + command: AnyBlockHeaders @dataclass @@ -152,7 +152,7 @@ class SendBlockBodiesEvent(PeerPoolMessageEvent): peer that sits in the peer pool. """ session: SessionAPI - command: BlockBodiesV65 + command: AnyBlockBodies @dataclass @@ -162,7 +162,7 @@ class SendNodeDataEvent(PeerPoolMessageEvent): peer that sits in the peer pool. """ session: SessionAPI - command: NodeDataV65 + command: AnyNodeData @dataclass @@ -172,7 +172,7 @@ class SendReceiptsEvent(PeerPoolMessageEvent): peer that sits in the peer pool. """ session: SessionAPI - command: ReceiptsV65 + command: AnyReceipts @dataclass @@ -192,7 +192,7 @@ class SendPooledTransactionsEvent(PeerPoolMessageEvent): the actual peer that sits in the peer pool. """ session: SessionAPI - command: PooledTransactionsV65 + command: AnyPooledTransactions # EXCHANGE HANDLER REQUEST / RESPONSE PAIRS diff --git a/trinity/protocol/eth/exchanges.py b/trinity/protocol/eth/exchanges.py index d7d748e172..dbc97613ad 100644 --- a/trinity/protocol/eth/exchanges.py +++ b/trinity/protocol/eth/exchanges.py @@ -14,9 +14,11 @@ noop_payload_validator, ) from p2p.exchange.normalizers import DefaultNormalizer +from trinity._utils.requests import gen_request_id from trinity.protocol.common.payloads import ( - BlockHeadersQuery, + BlockHeadersQuery, Hash32TupleQueryPayload, BlockHeadersQueryPayload, ) +from trinity.protocol.common.validators import match_payload_request_id from trinity.protocol.common.typing import ( BlockBodyBundles, NodeDataBundles, @@ -26,20 +28,24 @@ from .commands import ( BlockBodiesV65, BlockHeadersV65, + BlockHeadersV66, GetBlockBodiesV65, GetBlockHeadersV65, + GetBlockHeadersV66, GetNodeDataV65, GetReceiptsV65, NodeDataV65, ReceiptsV65, GetPooledTransactionsV65, PooledTransactionsV65, -) + GetBlockBodiesV66, BlockBodiesV66, GetNodeDataV66, NodeDataV66, GetReceiptsV66, ReceiptsV66, + PooledTransactionsV66, GetPooledTransactionsV66) from .normalizers import ( GetBlockBodiesNormalizer, GetNodeDataNormalizer, ReceiptsNormalizer, -) + GetBlockBodiesV66Normalizer, GetNodeDataV66Normalizer, GetReceiptsV66Normalizer) + from .trackers import ( GetBlockHeadersTracker, GetBlockBodiesTracker, @@ -90,6 +96,59 @@ async def __call__( # type: ignore ) +BaseGetBlockHeadersV66Exchange = BaseExchange[ + GetBlockHeadersV66, + BlockHeadersV66, + Tuple[BlockHeaderAPI, ...], +] + + +class GetBlockHeadersV66Exchange(BaseGetBlockHeadersV66Exchange): + _normalizer = DefaultNormalizer( + BlockHeadersV66, + Tuple[BlockHeaderAPI, ...], + normalize_fn=lambda res: res.payload.result + ) + tracker_class = GetBlockHeadersTracker + + _request_command_type = GetBlockHeadersV66 + _response_command_type = BlockHeadersV66 + + async def __call__( # type: ignore + self, + block_number_or_hash: BlockIdentifier, + max_headers: int = None, + skip: int = 0, + reverse: bool = True, + timeout: float = None) -> Tuple[BlockHeaderAPI, ...]: + + original_request_args = (block_number_or_hash, max_headers, skip, reverse) + validator = GetBlockHeadersValidator(*original_request_args) + + query = BlockHeadersQuery( + block_number_or_hash=block_number_or_hash, + max_headers=max_headers, + skip=skip, + reverse=reverse, + ) + payload = BlockHeadersQueryPayload( + request_id=gen_request_id(), + query=query, + ) + request = GetBlockHeadersV66(payload) + + return tuple(await self.get_result( + request, + self._normalizer, + validator, + match_payload_request_id, + timeout, + )) + + +BaseNodeDataExchange = BaseExchange[GetNodeDataV65, NodeDataV65, NodeDataBundles] + + class GetNodeDataV65Exchange(BaseExchange[GetNodeDataV65, NodeDataV65, NodeDataBundles]): _normalizer = GetNodeDataNormalizer() tracker_class = GetNodeDataTracker @@ -111,6 +170,32 @@ async def __call__(self, # type: ignore ) +class GetNodeDataV66Exchange(BaseExchange[GetNodeDataV66, NodeDataV66, NodeDataBundles]): + _normalizer = GetNodeDataV66Normalizer() + tracker_class = GetNodeDataTracker + + _request_command_type = GetNodeDataV66 + _response_command_type = NodeDataV66 + + async def __call__(self, # type: ignore + node_hashes: Sequence[Hash32], + timeout: float = None) -> NodeDataBundles: + validator = GetNodeDataValidator(node_hashes) + + request = GetNodeDataV66(Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(node_hashes) + )) + + return await self.get_result( + request, + self._normalizer, + validator, + match_payload_request_id, + timeout, + ) + + class GetReceiptsV65Exchange(BaseExchange[GetReceiptsV65, ReceiptsV65, ReceiptsBundles]): _normalizer = ReceiptsNormalizer() tracker_class = GetReceiptsTracker @@ -135,6 +220,34 @@ async def __call__(self, # type: ignore ) +class GetReceiptsV66Exchange(BaseExchange[GetReceiptsV66, ReceiptsV66, ReceiptsBundles]): + _normalizer = GetReceiptsV66Normalizer() + tracker_class = GetReceiptsTracker + + _request_command_type = GetReceiptsV66 + _response_command_type = ReceiptsV66 + + async def __call__(self, # type: ignore + headers: Sequence[BlockHeaderAPI], + timeout: float = None) -> ReceiptsBundles: + validator = ReceiptsValidator(headers) + + block_hashes = tuple(header.hash for header in headers) + + request = GetReceiptsV66(Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=block_hashes + )) + + return await self.get_result( + request, + self._normalizer, + validator, + match_payload_request_id, + timeout, + ) + + BaseGetBlockBodiesV65Exchange = BaseExchange[ GetBlockBodiesV65, BlockBodiesV65, @@ -166,6 +279,41 @@ async def __call__(self, # type: ignore ) +BaseGetBlockBodiesV66Exchange = BaseExchange[ + GetBlockBodiesV66, + BlockBodiesV66, + BlockBodyBundles, +] + + +class GetBlockBodiesV66Exchange(BaseGetBlockBodiesV66Exchange): + _normalizer = GetBlockBodiesV66Normalizer() + tracker_class = GetBlockBodiesTracker + + _request_command_type = GetBlockBodiesV66 + _response_command_type = BlockBodiesV66 + + async def __call__(self, # type: ignore + headers: Sequence[BlockHeaderAPI], + timeout: float = None) -> BlockBodyBundles: + + validator = GetBlockBodiesValidator(headers) + block_hashes = tuple(header.hash for header in headers) + + request = GetBlockBodiesV66(Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=block_hashes + )) + + return tuple(await self.get_result( + request, + self._normalizer, + validator, + match_payload_request_id, + timeout, + )) + + BasePooledTransactionsV65Exchange = BaseExchange[ GetPooledTransactionsV65, PooledTransactionsV65, @@ -192,3 +340,40 @@ async def __call__(self, # type: ignore noop_payload_validator, timeout, ) + + +BasePooledTransactionsV66Exchange = BaseExchange[ + GetPooledTransactionsV66, + PooledTransactionsV66, + Tuple[SignedTransactionAPI, ...] +] + + +class GetPooledTransactionsV66Exchange(BasePooledTransactionsV66Exchange): + _normalizer = DefaultNormalizer( + PooledTransactionsV66, + Tuple[SignedTransactionAPI, ...], + normalize_fn=lambda res: res.payload.result + ) + tracker_class = GetPooledTransactionsTracker + + _request_command_type = GetPooledTransactionsV66 + _response_command_type = PooledTransactionsV66 + + async def __call__(self, # type: ignore + transaction_hashes: Sequence[Hash32], + timeout: float = None) -> Tuple[SignedTransactionAPI, ...]: + validator = GetPooledTransactionsValidator(transaction_hashes) + + request = GetPooledTransactionsV66(Hash32TupleQueryPayload( + request_id=gen_request_id(), + query=tuple(transaction_hashes) + )) + + return await self.get_result( + request, + self._normalizer, + validator, + match_payload_request_id, + timeout, + ) diff --git a/trinity/protocol/eth/normalizers.py b/trinity/protocol/eth/normalizers.py index 7614c2a56d..deb96b058e 100644 --- a/trinity/protocol/eth/normalizers.py +++ b/trinity/protocol/eth/normalizers.py @@ -1,7 +1,6 @@ from typing import ( Iterable, ) - from eth_utils import ( to_tuple, ) @@ -23,10 +22,11 @@ BlockBodiesV65, NodeDataV65, ReceiptsV65, -) + BlockBodiesV66, NodeDataV66, ReceiptsV66) class GetNodeDataNormalizer(BaseNormalizer[NodeDataV65, NodeDataBundles]): + is_normalization_slow = True def normalize_result(self, cmd: NodeDataV65) -> NodeDataBundles: @@ -35,6 +35,15 @@ def normalize_result(self, cmd: NodeDataV65) -> NodeDataBundles: return result +class GetNodeDataV66Normalizer(BaseNormalizer[NodeDataV66, NodeDataBundles]): + is_normalization_slow = True + + def normalize_result(self, cmd: NodeDataV66) -> NodeDataBundles: + node_keys = map(keccak, cmd.payload.result) + result = tuple(zip(node_keys, cmd.payload.result)) + return result + + class ReceiptsNormalizer(BaseNormalizer[ReceiptsV65, ReceiptsBundles]): is_normalization_slow = True @@ -43,6 +52,14 @@ def normalize_result(self, cmd: ReceiptsV65) -> ReceiptsBundles: return tuple(zip(cmd.payload, trie_roots_and_data)) +class GetReceiptsV66Normalizer(BaseNormalizer[ReceiptsV66, ReceiptsBundles]): + is_normalization_slow = True + + def normalize_result(self, cmd: ReceiptsV66) -> ReceiptsBundles: + trie_roots_and_data = map(make_trie_root_and_nodes, cmd.payload.result) + return tuple(zip(cmd.payload.result, trie_roots_and_data)) + + class GetBlockBodiesNormalizer(BaseNormalizer[BlockBodiesV65, BlockBodyBundles]): is_normalization_slow = True @@ -52,3 +69,14 @@ def normalize_result(self, cmd: BlockBodiesV65) -> Iterable[BlockBodyBundle]: uncle_hashes = keccak(rlp.encode(body.uncles)) transaction_root_and_nodes = make_trie_root_and_nodes(body.transactions) yield body, transaction_root_and_nodes, uncle_hashes + + +class GetBlockBodiesV66Normalizer(BaseNormalizer[BlockBodiesV66, BlockBodyBundles]): + is_normalization_slow = True + + @to_tuple + def normalize_result(self, cmd: BlockBodiesV66) -> Iterable[BlockBodyBundle]: + for body in cmd.payload.result: + uncle_hashes = keccak(rlp.encode(body.uncles)) + transaction_root_and_nodes = make_trie_root_and_nodes(body.transactions) + yield body, transaction_root_and_nodes, uncle_hashes diff --git a/trinity/protocol/eth/payloads.py b/trinity/protocol/eth/payloads.py index 9e3cb001aa..bbbdf3082b 100644 --- a/trinity/protocol/eth/payloads.py +++ b/trinity/protocol/eth/payloads.py @@ -1,4 +1,8 @@ -from typing import NamedTuple, Tuple + +from typing import ( + NamedTuple, + Tuple, +) from eth_typing import BlockNumber, Hash32 diff --git a/trinity/protocol/eth/peer.py b/trinity/protocol/eth/peer.py index 06e0cbb166..08fc61dc06 100644 --- a/trinity/protocol/eth/peer.py +++ b/trinity/protocol/eth/peer.py @@ -37,9 +37,10 @@ ) from . import forkid -from .api import ETHV63API, ETHV65API, AnyETHAPI, ETHV64API +from .api import ETHV63API, AnyETHAPI, ETHV64API, ETHV65API, ETHV66API from .commands import ( GetBlockHeadersV65, + GetBlockHeadersV66, GetBlockBodiesV65, GetReceiptsV65, GetNodeDataV65, @@ -48,7 +49,7 @@ Transactions, NewPooledTransactionHashes, GetPooledTransactionsV65, -) + GetBlockBodiesV66, GetNodeDataV66, GetReceiptsV66, GetPooledTransactionsV66) from .constants import MAX_HEADERS_FETCH from .events import ( SendBlockHeadersEvent, @@ -73,7 +74,7 @@ SendTransactionsEvent, ) from .payloads import StatusV63Payload, StatusPayload -from .proto import ETHProtocolV63, ETHProtocolV64, ETHProtocolV65, BaseETHProtocol +from .proto import ETHProtocolV63, ETHProtocolV64, ETHProtocolV65, ETHProtocolV66, BaseETHProtocol from .proxy import ProxyETHAPI from .handshaker import ETHV63Handshaker, ETHHandshaker @@ -84,7 +85,8 @@ class ETHPeer(BaseChainPeer): supported_sub_protocols: Tuple[Type[BaseETHProtocol], ...] = ( ETHProtocolV63, ETHProtocolV64, - ETHProtocolV65 + ETHProtocolV65, + ETHProtocolV66, ) sub_proto: BaseETHProtocol = None @@ -92,17 +94,20 @@ def get_behaviors(self) -> Tuple[BehaviorAPI, ...]: return super().get_behaviors() + ( ETHV63API().as_behavior(), ETHV64API().as_behavior(), - ETHV65API().as_behavior() + ETHV65API().as_behavior(), + ETHV66API().as_behavior() ) @cached_property def eth_api(self) -> AnyETHAPI: if self.connection.has_protocol(ETHProtocolV63): return self.connection.get_logic(ETHV63API.name, ETHV63API) - if self.connection.has_protocol(ETHProtocolV64): + elif self.connection.has_protocol(ETHProtocolV64): return self.connection.get_logic(ETHV64API.name, ETHV64API) elif self.connection.has_protocol(ETHProtocolV65): return self.connection.get_logic(ETHV65API.name, ETHV65API) + elif self.connection.has_protocol(ETHProtocolV66): + return self.connection.get_logic(ETHV66API.name, ETHV66API) else: raise Exception("Unreachable code") @@ -168,7 +173,7 @@ async def get_handshakers(self) -> Tuple[HandshakerAPI[Any], ...]: total_difficulty=total_difficulty, genesis_hash=genesis_hash, network_id=self.context.network_id, - version=ETHProtocolV65.version, + version=ETHProtocolV66.version, fork_id=our_forkid ) @@ -189,14 +194,19 @@ class ETHPeerPoolEventServer(PeerPoolEventServer[ETHPeer]): subscription_msg_types = frozenset({ GetBlockHeadersV65, + GetBlockHeadersV66, GetBlockBodiesV65, + GetBlockBodiesV66, GetReceiptsV65, + GetReceiptsV66, GetNodeDataV65, + GetNodeDataV66, Transactions, NewBlockHashes, NewBlock, NewPooledTransactionHashes, GetPooledTransactionsV65, + GetPooledTransactionsV66, }) # SendX events that need to be forwarded to peer.sub_proto.send(event.command) @@ -229,6 +239,7 @@ async def handle_get_block_headers_request( self, event: GetBlockHeadersRequest) -> Tuple[BlockHeaderAPI, ...]: peer = self.get_peer(event.session) + return await peer.eth_api.get_block_headers( event.block_number_or_hash, event.max_headers, @@ -278,22 +289,22 @@ async def handle_native_peer_message(self, # These are broadcasted without a specific target. We shouldn't worry if they are consumed # or not (e.g. transaction pool is enabled or disabled etc) - if isinstance(cmd, GetBlockHeadersV65): + if isinstance(cmd, GetBlockHeadersV65) or isinstance(cmd, GetBlockHeadersV66): await self.event_bus.broadcast( GetBlockHeadersEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING, ) - elif isinstance(cmd, GetBlockBodiesV65): + elif isinstance(cmd, GetBlockBodiesV65) or isinstance(cmd, GetBlockBodiesV66): await self.event_bus.broadcast( GetBlockBodiesEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING, ) - elif isinstance(cmd, GetReceiptsV65): + elif isinstance(cmd, GetReceiptsV65) or isinstance(cmd, GetReceiptsV66): await self.event_bus.broadcast( GetReceiptsEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING, ) - elif isinstance(cmd, GetNodeDataV65): + elif isinstance(cmd, GetNodeDataV65) or isinstance(cmd, GetNodeDataV66): await self.event_bus.broadcast( GetNodeDataEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING, @@ -318,7 +329,7 @@ async def handle_native_peer_message(self, NewPooledTransactionHashesEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING ) - elif isinstance(cmd, GetPooledTransactionsV65): + elif isinstance(cmd, GetPooledTransactionsV65) or isinstance(cmd, GetPooledTransactionsV66): await self.event_bus.broadcast( GetPooledTransactionsEvent(session, cmd), FIRE_AND_FORGET_BROADCASTING diff --git a/trinity/protocol/eth/proto.py b/trinity/protocol/eth/proto.py index be674a2eed..9771034359 100644 --- a/trinity/protocol/eth/proto.py +++ b/trinity/protocol/eth/proto.py @@ -11,8 +11,10 @@ from .commands import ( BlockBodiesV65, BlockHeadersV65, + BlockHeadersV66, GetBlockBodiesV65, GetBlockHeadersV65, + GetBlockHeadersV66, GetNodeDataV65, GetReceiptsV65, NewBlock, @@ -25,7 +27,8 @@ NewPooledTransactionHashes, GetPooledTransactionsV65, PooledTransactionsV65, -) + BlockBodiesV66, GetBlockBodiesV66, NodeDataV66, GetNodeDataV66, ReceiptsV66, GetReceiptsV66, + PooledTransactionsV66, GetPooledTransactionsV66) if TYPE_CHECKING: from .peer import ETHPeer # noqa: F401 @@ -91,3 +94,24 @@ class ETHProtocolV65(BaseETHProtocol): logger = get_extended_debug_logger('trinity.protocol.eth.proto.ETHProtocolV65') status_command_type = Status + + +class ETHProtocolV66(BaseETHProtocol): + version = 66 + commands = ( + Status, + NewBlockHashes, + Transactions, + GetBlockHeadersV66, BlockHeadersV66, + GetBlockBodiesV66, BlockBodiesV66, + NewBlock, + NewPooledTransactionHashes, + GetPooledTransactionsV66, PooledTransactionsV66, + GetNodeDataV66, NodeDataV66, + GetReceiptsV66, ReceiptsV66, + ) + command_length = 20 + + logger = get_extended_debug_logger('trinity.protocol.eth.proto.ETHProtocolV66') + + status_command_type = Status diff --git a/trinity/protocol/eth/proxy.py b/trinity/protocol/eth/proxy.py index cc56ca6b3e..96b8a2089d 100644 --- a/trinity/protocol/eth/proxy.py +++ b/trinity/protocol/eth/proxy.py @@ -22,6 +22,13 @@ from trinity._utils.errors import SupportsError from trinity._utils.logging import get_logger +from trinity.protocol.common.payloads import ( + BlockHeadersResultPayload, + BlocksResultPayload, + BytesTupleResultPayload, + ReceiptBundleResultPayload, + TransactionsResultPayload, +) from trinity.protocol.common.typing import ( BlockBodyBundles, NodeDataBundles, @@ -30,12 +37,17 @@ from trinity.rlp.block_body import BlockBody from .commands import ( + AnyBlockHeaders, + AnyBlockBodies, BlockBodiesV65, + BlockBodiesV66, BlockHeadersV65, + BlockHeadersV66, NodeDataV65, ReceiptsV65, Transactions, - PooledTransactionsV65, + PooledTransactionsV65, AnyNodeData, NodeDataV66, AnyReceipts, ReceiptsV66, + AnyPooledTransactions, PooledTransactionsV66, ) from .events import ( GetBlockBodiesRequest, @@ -204,40 +216,79 @@ def send_transactions(self, ) def send_pooled_transactions(self, - txns: Sequence[SignedTransactionAPI]) -> None: - command = PooledTransactionsV65(tuple(txns)) + transactions: Sequence[SignedTransactionAPI], + request_id: int = None) -> None: + if request_id is None: + command: AnyPooledTransactions = PooledTransactionsV65(tuple(transactions)) + else: + command = PooledTransactionsV66(TransactionsResultPayload( + request_id=request_id, + result=tuple(transactions) + )) self._event_bus.broadcast_nowait( SendPooledTransactionsEvent(self.session, command), self._broadcast_config, ) - def send_block_headers(self, headers: Sequence[BlockHeaderAPI]) -> None: - command = BlockHeadersV65(tuple(headers)) + def send_block_headers(self, + headers: Sequence[BlockHeaderAPI], + request_id: int = None) -> None: + if request_id is None: + command: AnyBlockHeaders = BlockHeadersV65(tuple(headers)) + else: + command = BlockHeadersV66(BlockHeadersResultPayload( + result=tuple(headers), + request_id=request_id + )) self._event_bus.broadcast_nowait( SendBlockHeadersEvent(self.session, command), self._broadcast_config, ) - def send_block_bodies(self, blocks: Sequence[BlockAPI]) -> None: + def send_block_bodies(self, blocks: Sequence[BlockAPI], request_id: int = None) -> None: + block_bodies = tuple( BlockBody(block.transactions, block.uncles) for block in blocks ) - command = BlockBodiesV65(block_bodies) + + if request_id is None: + command: AnyBlockBodies = BlockBodiesV65(block_bodies) + else: + command = BlockBodiesV66(BlocksResultPayload( + request_id=request_id, + result=block_bodies + )) + self._event_bus.broadcast_nowait( SendBlockBodiesEvent(self.session, command), self._broadcast_config, ) - def send_receipts(self, receipts: Sequence[Sequence[ReceiptAPI]]) -> None: - command = ReceiptsV65(tuple(map(tuple, receipts))) + def send_receipts(self, + receipts: Sequence[Sequence[ReceiptAPI]], + request_id: int = None) -> None: + payload: Tuple[Tuple[ReceiptAPI, ...], ...] = tuple(map(tuple, receipts)) + if request_id is None: + command: AnyReceipts = ReceiptsV65(payload) + else: + command = ReceiptsV66(ReceiptBundleResultPayload( + request_id=request_id, + result=payload + )) self._event_bus.broadcast_nowait( SendReceiptsEvent(self.session, command), self._broadcast_config, ) - def send_node_data(self, nodes: Sequence[bytes]) -> None: - command = NodeDataV65(tuple(nodes)) + def send_node_data(self, nodes: Sequence[bytes], request_id: int = None) -> None: + if request_id is None: + command: AnyNodeData = NodeDataV65(tuple(nodes)) + else: + command = NodeDataV66(BytesTupleResultPayload( + request_id=request_id, + result=tuple(nodes) + )) self._event_bus.broadcast_nowait( SendNodeDataEvent(self.session, command), self._broadcast_config, diff --git a/trinity/protocol/eth/servers.py b/trinity/protocol/eth/servers.py index 1b7438901d..cf380299f6 100644 --- a/trinity/protocol/eth/servers.py +++ b/trinity/protocol/eth/servers.py @@ -1,10 +1,11 @@ from typing import ( Any, -) + Sequence) from eth.exceptions import ( HeaderNotFound, ) +from eth_typing import Hash32 from eth_utils import ( to_hex, ) @@ -19,6 +20,7 @@ from p2p.abc import CommandAPI, SessionAPI from trinity.db.eth1.chain import BaseAsyncChainDB +from trinity.protocol.common.payloads import BlockHeadersQuery from trinity.protocol.common.servers import ( BaseIsolatedRequestServer, BasePeerRequestHandler, @@ -44,13 +46,6 @@ ) from trinity.rlp.block_body import BlockBody -from .commands import ( - GetBlockHeadersV65, - GetNodeDataV65, - GetBlockBodiesV65, - GetReceiptsV65, -) - class ETHPeerRequestHandler(BasePeerRequestHandler): def __init__(self, db: BaseAsyncChainDB) -> None: @@ -59,17 +54,18 @@ def __init__(self, db: BaseAsyncChainDB) -> None: async def handle_get_block_headers( self, peer: ETHProxyPeer, - command: GetBlockHeadersV65) -> None: - self.logger.debug("%s requested headers: %s", peer, command.payload) - - headers = await self.lookup_headers(command.payload) + query: BlockHeadersQuery, + request_id: int = None) -> None: + self.logger.debug("%s requested headers: %s", peer, query) + headers = await self.lookup_headers(query) self.logger.debug2("Replying to %s with %d headers", peer, len(headers)) - peer.eth_api.send_block_headers(headers) + peer.eth_api.send_block_headers(headers, request_id=request_id) async def handle_get_block_bodies(self, peer: ETHProxyPeer, - command: GetBlockBodiesV65) -> None: - block_hashes = command.payload + query: Sequence[Hash32], + request_id: int = None) -> None: + block_hashes = query self.logger.debug2("%s requested bodies for %d blocks", peer, len(block_hashes)) bodies = [] @@ -105,10 +101,13 @@ async def handle_get_block_bodies(self, continue bodies.append(BlockBody(transactions, uncles)) self.logger.debug2("Replying to %s with %d block bodies", peer, len(bodies)) - peer.eth_api.send_block_bodies(bodies) + peer.eth_api.send_block_bodies(bodies, request_id) - async def handle_get_receipts(self, peer: ETHProxyPeer, command: GetReceiptsV65) -> None: - block_hashes = command.payload + async def handle_get_receipts(self, + peer: ETHProxyPeer, + query: Sequence[Hash32], + request_id: int = None) -> None: + block_hashes = query self.logger.debug2("%s requested receipts for %d blocks", peer, len(block_hashes)) receipts = [] @@ -134,10 +133,12 @@ async def handle_get_receipts(self, peer: ETHProxyPeer, command: GetReceiptsV65) continue receipts.append(block_receipts) self.logger.debug2("Replying to %s with receipts for %d blocks", peer, len(receipts)) - peer.eth_api.send_receipts(receipts) + peer.eth_api.send_receipts(receipts, request_id) - async def handle_get_node_data(self, peer: ETHProxyPeer, command: GetNodeDataV65) -> None: - node_hashes = command.payload + async def handle_get_node_data(self, + peer: ETHProxyPeer, + query: Sequence[Hash32], request_id: int = None) -> None: + node_hashes = query self.logger.debug2("%s requested %d trie nodes", peer, len(node_hashes)) nodes = [] @@ -158,7 +159,7 @@ async def handle_get_node_data(self, peer: ETHProxyPeer, command: GetNodeDataV65 len(missing_node_hashes), len(node_hashes), ) - peer.eth_api.send_node_data(tuple(nodes)) + peer.eth_api.send_node_data(tuple(nodes), request_id) class ETHRequestServer(BaseIsolatedRequestServer): @@ -187,12 +188,32 @@ async def _handle_msg(self, peer = ETHProxyPeer.from_session(session, self.event_bus, self.broadcast_config) if isinstance(cmd, commands.GetBlockHeadersV65): - await self._handler.handle_get_block_headers(peer, cmd) + await self._handler.handle_get_block_headers(peer, cmd.payload) + elif isinstance(cmd, commands.GetBlockHeadersV66): + await self._handler.handle_get_block_headers( + peer, + cmd.payload.query, + cmd.payload.request_id + ) elif isinstance(cmd, commands.GetBlockBodiesV65): - await self._handler.handle_get_block_bodies(peer, cmd) + await self._handler.handle_get_block_bodies(peer, cmd.payload) + elif isinstance(cmd, commands.GetBlockBodiesV66): + await self._handler.handle_get_block_bodies( + peer, + cmd.payload.query, + cmd.payload.request_id + ) elif isinstance(cmd, commands.GetReceiptsV65): - await self._handler.handle_get_receipts(peer, cmd) + await self._handler.handle_get_receipts(peer, cmd.payload) + elif isinstance(cmd, commands.GetReceiptsV66): + await self._handler.handle_get_receipts(peer, cmd.payload.query, cmd.payload.request_id) elif isinstance(cmd, commands.GetNodeDataV65): - await self._handler.handle_get_node_data(peer, cmd) + await self._handler.handle_get_node_data(peer, cmd.payload) + elif isinstance(cmd, commands.GetNodeDataV66): + await self._handler.handle_get_node_data( + peer, + cmd.payload.query, + cmd.payload.request_id + ) else: self.logger.debug("%s msg not handled yet, needs to be implemented", cmd) diff --git a/trinity/protocol/eth/trackers.py b/trinity/protocol/eth/trackers.py index 2cc4b3de66..6f4c937a4a 100644 --- a/trinity/protocol/eth/trackers.py +++ b/trinity/protocol/eth/trackers.py @@ -4,34 +4,42 @@ Sequence) from eth.abc import BlockHeaderAPI, SignedTransactionAPI +from eth_typing import Hash32 from p2p.exchange import BasePerformanceTracker +from trinity.protocol.common.payloads import ( + BlockHeadersQuery, + get_cmd_payload, +) from trinity.protocol.common.typing import ( BlockBodyBundles, NodeDataBundles, ReceiptsBundles, ) + from trinity._utils.headers import sequence_builder from .commands import ( - GetBlockBodiesV65, - GetBlockHeadersV65, - GetNodeDataV65, - GetReceiptsV65, - GetPooledTransactionsV65, + AnyGetBlockHeaders, + AnyGetBlockBodies, + AnyGetNodeData, + AnyGetReceipts, + AnyGetPooledTransactions, ) BaseGetBlockHeadersTracker = BasePerformanceTracker[ - GetBlockHeadersV65, + AnyGetBlockHeaders, Tuple[BlockHeaderAPI, ...], ] class GetBlockHeadersTracker(BaseGetBlockHeadersTracker): - def _get_request_size(self, request: GetBlockHeadersV65) -> int: - payload = request.payload + + def _get_request_size(self, request: AnyGetBlockHeaders) -> int: + payload: BlockHeadersQuery = get_cmd_payload(request) + if isinstance(payload.block_number_or_hash, int): return len(sequence_builder( start_number=payload.block_number_or_hash, @@ -49,9 +57,10 @@ def _get_result_item_count(self, result: Tuple[BlockHeaderAPI, ...]) -> int: return len(result) -class GetBlockBodiesTracker(BasePerformanceTracker[GetBlockBodiesV65, BlockBodyBundles]): - def _get_request_size(self, request: GetBlockBodiesV65) -> Optional[int]: - return len(request.payload) +class GetBlockBodiesTracker(BasePerformanceTracker[AnyGetBlockBodies, BlockBodyBundles]): + def _get_request_size(self, request: AnyGetBlockBodies) -> Optional[int]: + payload: Tuple[Hash32, ...] = get_cmd_payload(request) + return len(payload) def _get_result_size(self, result: BlockBodyBundles) -> int: return len(result) @@ -64,9 +73,10 @@ def _get_result_item_count(self, result: BlockBodyBundles) -> int: ) -class GetReceiptsTracker(BasePerformanceTracker[GetReceiptsV65, ReceiptsBundles]): - def _get_request_size(self, request: GetReceiptsV65) -> Optional[int]: - return len(request.payload) +class GetReceiptsTracker(BasePerformanceTracker[AnyGetReceipts, ReceiptsBundles]): + def _get_request_size(self, request: AnyGetReceipts) -> Optional[int]: + payload: Tuple[Hash32, ...] = get_cmd_payload(request) + return len(payload) def _get_result_size(self, result: ReceiptsBundles) -> int: return len(result) @@ -79,9 +89,10 @@ def _get_result_item_count(self, result: ReceiptsBundles) -> int: ) -class GetNodeDataTracker(BasePerformanceTracker[GetNodeDataV65, NodeDataBundles]): - def _get_request_size(self, request: GetNodeDataV65) -> Optional[int]: - return len(request.payload) +class GetNodeDataTracker(BasePerformanceTracker[AnyGetNodeData, NodeDataBundles]): + def _get_request_size(self, request: AnyGetNodeData) -> Optional[int]: + payload: Tuple[Hash32, ...] = get_cmd_payload(request) + return len(payload) def _get_result_size(self, result: NodeDataBundles) -> int: return len(result) @@ -91,15 +102,16 @@ def _get_result_item_count(self, result: NodeDataBundles) -> int: BaseGetPooledTransactionsTracker = BasePerformanceTracker[ - GetPooledTransactionsV65, + AnyGetPooledTransactions, Tuple[SignedTransactionAPI, ...] ] class GetPooledTransactionsTracker(BaseGetPooledTransactionsTracker): - def _get_request_size(self, request: GetPooledTransactionsV65) -> Optional[int]: - return len(request.payload) + def _get_request_size(self, request: AnyGetPooledTransactions) -> Optional[int]: + payload: Tuple[Hash32, ...] = get_cmd_payload(request) + return len(payload) def _get_result_size(self, result: Sequence[SignedTransactionAPI]) -> int: return len(result) diff --git a/trinity/protocol/les/api.py b/trinity/protocol/les/api.py index 37f1ac38f9..bab278a25e 100644 --- a/trinity/protocol/les/api.py +++ b/trinity/protocol/les/api.py @@ -20,8 +20,8 @@ from p2p.logic import Application, CommandHandler from p2p.qualifiers import HasProtocol -from trinity._utils.les import gen_request_id from trinity.protocol.common.abc import HeadInfoAPI +from trinity._utils.requests import gen_request_id from .constants import ( MAX_BODIES_FETCH, diff --git a/trinity/protocol/les/exchanges.py b/trinity/protocol/les/exchanges.py index 2d95f20dae..a7bdfb6a56 100644 --- a/trinity/protocol/les/exchanges.py +++ b/trinity/protocol/les/exchanges.py @@ -10,9 +10,7 @@ BaseExchange, ) from p2p.exchange.normalizers import DefaultNormalizer -from trinity._utils.les import ( - gen_request_id, -) +from trinity._utils.requests import gen_request_id from .commands import ( BlockHeaders, @@ -28,8 +26,8 @@ ) from .validators import ( GetBlockHeadersValidator, - match_payload_request_id, ) +from ..common.validators import match_payload_request_id TResult = TypeVar('TResult') diff --git a/trinity/protocol/les/proxy.py b/trinity/protocol/les/proxy.py index cb9194feea..7d5f5bae26 100644 --- a/trinity/protocol/les/proxy.py +++ b/trinity/protocol/les/proxy.py @@ -10,13 +10,11 @@ from p2p.abc import SessionAPI -from trinity._utils.les import ( - gen_request_id, -) from trinity._utils.errors import ( SupportsError, ) from trinity._utils.logging import get_logger +from trinity._utils.requests import gen_request_id from .commands import ( BlockHeaders, diff --git a/trinity/protocol/les/validators.py b/trinity/protocol/les/validators.py index 29d0f2eccc..5c1526888e 100644 --- a/trinity/protocol/les/validators.py +++ b/trinity/protocol/les/validators.py @@ -1,9 +1,3 @@ -from typing import Any - -from eth_utils import ( - ValidationError, -) - from trinity.protocol.common.validators import ( BaseBlockHeadersValidator, ) @@ -12,8 +6,3 @@ class GetBlockHeadersValidator(BaseBlockHeadersValidator): protocol_max_request_size = constants.MAX_HEADERS_FETCH - - -def match_payload_request_id(request: Any, response: Any) -> None: - if request.request_id != response.payload.request_id: - raise ValidationError("Request `id` does not match") diff --git a/trinity/sync/beam/queen.py b/trinity/sync/beam/queen.py index 13317be9c1..e6ef333cc6 100644 --- a/trinity/sync/beam/queen.py +++ b/trinity/sync/beam/queen.py @@ -12,7 +12,7 @@ from p2p.exchange import PerformanceAPI from p2p.peer import BasePeer, PeerSubscriber from p2p.service import BaseService -from trinity.protocol.eth.commands import NodeDataV65 +from trinity.protocol.eth.commands import NodeDataV65, NodeDataV66 from trinity.protocol.eth.peer import ETHPeer, ETHPeerPool from trinity.sync.beam.constants import NON_IDEAL_RESPONSE_PENALTY from trinity.sync.common.peers import WaitingPeers @@ -56,7 +56,7 @@ class QueeningQueue(BaseService, PeerSubscriber, QueenTrackerAPI): def __init__(self, peer_pool: ETHPeerPool, token: CancelToken = None) -> None: super().__init__(token=token) self._peer_pool = peer_pool - self._waiting_peers = WaitingPeers(NodeDataV65) + self._waiting_peers = WaitingPeers((NodeDataV65, NodeDataV66,)) async def _run(self) -> None: with self.subscribe(self._peer_pool): diff --git a/trinity/sync/common/headers.py b/trinity/sync/common/headers.py index 853a0e8003..8c6153b84a 100644 --- a/trinity/sync/common/headers.py +++ b/trinity/sync/common/headers.py @@ -51,7 +51,10 @@ from trinity.chains.base import AsyncChainAPI from trinity.db.eth1.header import BaseAsyncHeaderDB -from trinity.protocol.eth.commands import BlockHeadersV65 as ETHBlockHeaders +from trinity.protocol.eth.commands import ( + BlockHeadersV65 as ETHV65BlockHeaders, + BlockHeadersV66 as ETHV66BlockHeaders +) from trinity.protocol.les.commands import BlockHeaders as LESBlockHEaders from trinity.protocol.common.monitors import BaseChainTipMonitor from trinity.protocol.common.peer import BaseChainPeer, BaseChainPeerPool @@ -579,7 +582,7 @@ def __init__( # queue up idle peers, ordered by speed that they return block bodies self._waiting_peers: WaitingPeers[TChainPeer] = WaitingPeers( - (ETHBlockHeaders, LESBlockHEaders), + (ETHV66BlockHeaders, ETHV65BlockHeaders, LESBlockHEaders), ) self._peer_pool = peer_pool self.sync_progress: SyncProgress = None diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index a0d062ca7e..a90eda081c 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -141,7 +141,9 @@ def __init__(self, self._header_syncer = header_syncer # queue up any idle peers, in order of how fast they return block bodies - self._body_peers: WaitingPeers[ETHPeer] = WaitingPeers(commands.BlockBodiesV65) + self._body_peers: WaitingPeers[ETHPeer] = WaitingPeers( + (commands.BlockBodiesV65, commands.BlockBodiesV66,) + ) # Track incomplete block body download tasks # - arbitrarily allow several requests-worth of headers queued up @@ -592,7 +594,9 @@ def __init__(self, super().__init__(chain, db, peer_pool, header_syncer, token) # queue up any idle peers, in order of how fast they return receipts - self._receipt_peers: WaitingPeers[ETHPeer] = WaitingPeers(commands.ReceiptsV65) + self._receipt_peers: WaitingPeers[ETHPeer] = WaitingPeers( + (commands.ReceiptsV65, commands.ReceiptsV66,) + ) # Track receipt download tasks # - arbitrarily allow several requests-worth of headers queued up diff --git a/trinity/tools/factories/eth/proto.py b/trinity/tools/factories/eth/proto.py index 580c05d1f7..8b492a98f5 100644 --- a/trinity/tools/factories/eth/proto.py +++ b/trinity/tools/factories/eth/proto.py @@ -201,4 +201,5 @@ def ETHV65PeerPairFactory(*, ETHV63PeerPairFactory, ETHV64PeerPairFactory, ETHV65PeerPairFactory, + LatestETHPeerPairFactory, )