diff --git a/newsfragments/3492.feature.rst b/newsfragments/3492.feature.rst new file mode 100644 index 0000000000..a1c4921b0c --- /dev/null +++ b/newsfragments/3492.feature.rst @@ -0,0 +1 @@ +Add a configuration option for the ``read_buffer_limit`` for ``AsyncIPCProvider`` in order to control the expected message size limit. Set this default value to 20MB. diff --git a/newsfragments/3492.performance.rst b/newsfragments/3492.performance.rst new file mode 100644 index 0000000000..7b87cb5f8d --- /dev/null +++ b/newsfragments/3492.performance.rst @@ -0,0 +1 @@ +Improve logic for reading from the async IPC socket in order to properly handle and adjust the handling of large messages. This improves reading speeds in general. diff --git a/tests/core/providers/test_async_ipc_provider.py b/tests/core/providers/test_async_ipc_provider.py index 405b923a8f..c9e1a7e716 100644 --- a/tests/core/providers/test_async_ipc_provider.py +++ b/tests/core/providers/test_async_ipc_provider.py @@ -38,6 +38,10 @@ }, } +TWENTY_MB = 20 * 1024 * 1024 +SIZED_MSG_START = b'{"id": 0, "jsonrpc": "2.0", "result": "' +SIZED_MSG_END = b'"}\n' b"" + @pytest.fixture def jsonrpc_ipc_pipe_path(): @@ -61,53 +65,80 @@ def simple_ipc_server(jsonrpc_ipc_pipe_path): serv.close() -@pytest.fixture -def serve_empty_result(simple_ipc_server): +def ipc_server_reply(simple_ipc_server, response_fn): def reply(): connection, client_address = simple_ipc_server.accept() try: connection.recv(1024) - connection.sendall(b'{"id": 0, "result": {}') - time.sleep(0.1) - connection.sendall(b"}") + response_fn(connection) except BrokenPipeError: pass finally: - # Clean up the connection connection.close() simple_ipc_server.close() thd = Thread(target=reply, daemon=True) thd.start() - - try: - yield - finally: - thd.join() + return thd @pytest.fixture -def serve_subscription_result(simple_ipc_server): - def reply(): - connection, client_address = simple_ipc_server.accept() +def ipc_server_fixture(simple_ipc_server): + def server_fixture(response_fn): + thread = ipc_server_reply(simple_ipc_server, response_fn) try: - connection.recv(1024) - connection.sendall( - b'{"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"}' # noqa: E501 - ) - connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8")) + yield finally: - # Clean up the connection - connection.close() - simple_ipc_server.close() + thread.join() - thd = Thread(target=reply, daemon=True) - thd.start() + return server_fixture - try: - yield - finally: - thd.join() + +@pytest.fixture +def serve_empty_result(ipc_server_fixture): + def response_fn(connection): + connection.sendall(b'{"id": 0, "result": {}') + time.sleep(0.1) + connection.sendall(b"}\n") + + yield from ipc_server_fixture(response_fn) + + +@pytest.fixture +def serve_20mb_response(ipc_server_fixture): + def response_fn(connection): + connection.sendall( + SIZED_MSG_START + + (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END))) + + SIZED_MSG_END + ) + + yield from ipc_server_fixture(response_fn) + + +@pytest.fixture +def serve_larger_than_20mb_response(ipc_server_fixture): + def response_fn(connection): + connection.sendall( + SIZED_MSG_START + + (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END) + 1024)) + + SIZED_MSG_END + ) + + yield from ipc_server_fixture(response_fn) + + +@pytest.fixture +def serve_subscription_result(ipc_server_fixture): + def response_fn(connection): + connection.sendall( + b"{" + b'"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"' + b"}\n" + ) + connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8")) + + yield from ipc_server_fixture(response_fn) def test_ipc_tilde_in_path(): @@ -226,7 +257,7 @@ async def test_async_iterator_pattern_exception_handling_for_requests( exception_caught = False async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))): # patch the listener to raise `ConnectionClosed` on read - w3.provider._reader.read = _raise_connection_closed + w3.provider._reader.readline = _raise_connection_closed try: await w3.eth.block_number except ConnectionClosed: @@ -249,7 +280,7 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions( exception_caught = False async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))): # patch the listener to raise `ConnectionClosed` on read - w3.provider._reader.read = _raise_connection_closed + w3.provider._reader.readline = _raise_connection_closed try: async for _ in w3.socket.process_subscriptions(): # raises exception @@ -264,3 +295,37 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions( pytest.fail("Expected `ConnectionClosed` exception.") assert exception_caught + + +@pytest.mark.asyncio +async def test_async_ipc_reader_can_read_20mb_message( + jsonrpc_ipc_pipe_path, serve_20mb_response +): + async with AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))) as w3: + response = await w3.provider.make_request("method", []) + assert len(response["result"]) == TWENTY_MB - len(SIZED_MSG_START) - len( + SIZED_MSG_END + ) + + +@pytest.mark.asyncio +async def test_async_ipc_reader_raises_on_msg_over_20mb( + jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response +): + with pytest.raises(ValueError): + async with AsyncWeb3( + AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path)) + ) as w3: + await w3.provider.make_request("method", []) + + +@pytest.mark.asyncio +async def test_async_ipc_read_buffer_limit_is_configurable( + jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response +): + async with AsyncWeb3( + AsyncIPCProvider( + pathlib.Path(jsonrpc_ipc_pipe_path), read_buffer_limit=TWENTY_MB + 1024 + ) + ) as w3: + await w3.provider.make_request("method", []) diff --git a/web3/providers/persistent/async_ipc.py b/web3/providers/persistent/async_ipc.py index d6dca67566..ca37182390 100644 --- a/web3/providers/persistent/async_ipc.py +++ b/web3/providers/persistent/async_ipc.py @@ -1,9 +1,6 @@ import asyncio import errno import json -from json import ( - JSONDecodeError, -) import logging from pathlib import ( Path, @@ -16,10 +13,6 @@ Union, ) -from eth_utils import ( - to_text, -) - from web3.types import ( RPCResponse, ) @@ -28,6 +21,7 @@ PersistentConnectionProvider, ) from ...exceptions import ( + PersistentConnectionClosedOK, ProviderConnectionError, Web3TypeError, ) @@ -37,7 +31,7 @@ async def async_get_ipc_socket( - ipc_path: str, + ipc_path: str, read_buffer_limit: int ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: if sys.platform == "win32": # On Windows named pipe is used. Simulate socket with it. @@ -47,7 +41,7 @@ async def async_get_ipc_socket( return NamedPipe(ipc_path) else: - return await asyncio.open_unix_connection(ipc_path) + return await asyncio.open_unix_connection(ipc_path, limit=read_buffer_limit) class AsyncIPCProvider(PersistentConnectionProvider): @@ -56,11 +50,11 @@ class AsyncIPCProvider(PersistentConnectionProvider): _reader: Optional[asyncio.StreamReader] = None _writer: Optional[asyncio.StreamWriter] = None _decoder: json.JSONDecoder = json.JSONDecoder() - _raw_message: str = "" def __init__( self, ipc_path: Optional[Union[str, Path]] = None, + read_buffer_limit: int = 20 * 1024 * 1024, # 20 MB # `PersistentConnectionProvider` kwargs can be passed through **kwargs: Any, ) -> None: @@ -71,6 +65,7 @@ def __init__( else: raise Web3TypeError("ipc_path must be of type string or pathlib.Path") + self.read_buffer_limit = read_buffer_limit super().__init__(**kwargs) def __str__(self) -> str: @@ -101,18 +96,10 @@ async def socket_send(self, request_data: bytes) -> None: ) async def socket_recv(self) -> RPCResponse: - while True: - # yield to the event loop to allow other tasks to run - await asyncio.sleep(0) - - try: - response, pos = self._decoder.raw_decode(self._raw_message) - self._raw_message = self._raw_message[pos:].lstrip() - return response - except JSONDecodeError: - # read more data from the socket if the current raw message is - # incomplete - self._raw_message += to_text(await self._reader.read(4096)).lstrip() + data = await self._reader.readline() + if not data: + raise PersistentConnectionClosedOK("Socket reader received end of stream.") + return self.decode_rpc_response(data) # -- private methods -- # @@ -131,10 +118,14 @@ async def _socket_send(self, request_data: bytes) -> None: async def _reset_socket(self) -> None: self._writer.close() await self._writer.wait_closed() - self._reader, self._writer = await async_get_ipc_socket(self.ipc_path) + self._reader, self._writer = await async_get_ipc_socket( + self.ipc_path, self.read_buffer_limit + ) async def _provider_specific_connect(self) -> None: - self._reader, self._writer = await async_get_ipc_socket(self.ipc_path) + self._reader, self._writer = await async_get_ipc_socket( + self.ipc_path, self.read_buffer_limit + ) async def _provider_specific_disconnect(self) -> None: if self._writer and not self._writer.is_closing(): @@ -149,5 +140,3 @@ async def _provider_specific_socket_reader(self) -> RPCResponse: def _error_log_listener_task_exception(self, e: Exception) -> None: super()._error_log_listener_task_exception(e) - # reset the raw message buffer on exception when error logging - self._raw_message = ""