Skip to content

Commit

Permalink
Fix buffer issues with async IPC; add tests:
Browse files Browse the repository at this point in the history
- Move to a readline() approach on the stream
  reader for IPC. The older implementation for
  the original IPC would read in chunks but this
  isn't ideal and, it turns out, opting for
  readline while allowing control of the buffer
  limit lends itself quite nicely to plucking
  well-formed responses from the socket.
  • Loading branch information
fselmo committed Sep 20, 2024
1 parent e8eb20e commit 517399c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 56 deletions.
1 change: 1 addition & 0 deletions newsfragments/3492.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a configuration option for the ``read_buffer_limit`` for ``AsyncIPCProvider`` in order to control the expected message size limit. Set this default value to 20MB.
1 change: 1 addition & 0 deletions newsfragments/3492.performance.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve logic for reading from the async IPC socket in order to properly handle and adjust the handling of large messages. This improves reading speeds in general.
125 changes: 95 additions & 30 deletions tests/core/providers/test_async_ipc_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
},
}

TWENTY_MB = 20 * 1024 * 1024
SIZED_MSG_START = b'{"id": 0, "jsonrpc": "2.0", "result": "'
SIZED_MSG_END = b'"}\n' b""


@pytest.fixture
def jsonrpc_ipc_pipe_path():
Expand All @@ -61,53 +65,80 @@ def simple_ipc_server(jsonrpc_ipc_pipe_path):
serv.close()


@pytest.fixture
def serve_empty_result(simple_ipc_server):
def ipc_server_reply(simple_ipc_server, response_fn):
def reply():
connection, client_address = simple_ipc_server.accept()
try:
connection.recv(1024)
connection.sendall(b'{"id": 0, "result": {}')
time.sleep(0.1)
connection.sendall(b"}")
response_fn(connection)
except BrokenPipeError:
pass
finally:
# Clean up the connection
connection.close()
simple_ipc_server.close()

thd = Thread(target=reply, daemon=True)
thd.start()

try:
yield
finally:
thd.join()
return thd


@pytest.fixture
def serve_subscription_result(simple_ipc_server):
def reply():
connection, client_address = simple_ipc_server.accept()
def ipc_server_fixture(simple_ipc_server):
def server_fixture(response_fn):
thread = ipc_server_reply(simple_ipc_server, response_fn)
try:
connection.recv(1024)
connection.sendall(
b'{"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"}' # noqa: E501
)
connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8"))
yield
finally:
# Clean up the connection
connection.close()
simple_ipc_server.close()
thread.join()

thd = Thread(target=reply, daemon=True)
thd.start()
return server_fixture

try:
yield
finally:
thd.join()

@pytest.fixture
def serve_empty_result(ipc_server_fixture):
def response_fn(connection):
connection.sendall(b'{"id": 0, "result": {}')
time.sleep(0.1)
connection.sendall(b"}\n")

yield from ipc_server_fixture(response_fn)


@pytest.fixture
def serve_20mb_response(ipc_server_fixture):
def response_fn(connection):
connection.sendall(
SIZED_MSG_START
+ (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END)))
+ SIZED_MSG_END
)

yield from ipc_server_fixture(response_fn)


@pytest.fixture
def serve_larger_than_20mb_response(ipc_server_fixture):
def response_fn(connection):
connection.sendall(
SIZED_MSG_START
+ (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END) + 1024))
+ SIZED_MSG_END
)

yield from ipc_server_fixture(response_fn)


@pytest.fixture
def serve_subscription_result(ipc_server_fixture):
def response_fn(connection):
connection.sendall(
b"{"
b'"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"'
b"}\n"
)
connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8"))

yield from ipc_server_fixture(response_fn)


def test_ipc_tilde_in_path():
Expand Down Expand Up @@ -226,7 +257,7 @@ async def test_async_iterator_pattern_exception_handling_for_requests(
exception_caught = False
async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))):
# patch the listener to raise `ConnectionClosed` on read
w3.provider._reader.read = _raise_connection_closed
w3.provider._reader.readline = _raise_connection_closed
try:
await w3.eth.block_number
except ConnectionClosed:
Expand All @@ -249,7 +280,7 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions(
exception_caught = False
async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))):
# patch the listener to raise `ConnectionClosed` on read
w3.provider._reader.read = _raise_connection_closed
w3.provider._reader.readline = _raise_connection_closed
try:
async for _ in w3.socket.process_subscriptions():
# raises exception
Expand All @@ -264,3 +295,37 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions(
pytest.fail("Expected `ConnectionClosed` exception.")

assert exception_caught


@pytest.mark.asyncio
async def test_async_ipc_reader_can_read_20mb_message(
jsonrpc_ipc_pipe_path, serve_20mb_response
):
async with AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))) as w3:
response = await w3.provider.make_request("method", [])
assert len(response["result"]) == TWENTY_MB - len(SIZED_MSG_START) - len(
SIZED_MSG_END
)


@pytest.mark.asyncio
async def test_async_ipc_reader_raises_on_msg_over_20mb(
jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response
):
with pytest.raises(ValueError):
async with AsyncWeb3(
AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))
) as w3:
await w3.provider.make_request("method", [])


@pytest.mark.asyncio
async def test_async_ipc_read_buffer_limit_is_configurable(
jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response
):
async with AsyncWeb3(
AsyncIPCProvider(
pathlib.Path(jsonrpc_ipc_pipe_path), read_buffer_limit=TWENTY_MB + 1024
)
) as w3:
await w3.provider.make_request("method", [])
41 changes: 15 additions & 26 deletions web3/providers/persistent/async_ipc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import asyncio
import errno
import json
from json import (
JSONDecodeError,
)
import logging
from pathlib import (
Path,
Expand All @@ -16,10 +13,6 @@
Union,
)

from eth_utils import (
to_text,
)

from web3.types import (
RPCResponse,
)
Expand All @@ -28,6 +21,7 @@
PersistentConnectionProvider,
)
from ...exceptions import (
PersistentConnectionClosedOK,
ProviderConnectionError,
Web3TypeError,
)
Expand All @@ -37,7 +31,7 @@


async def async_get_ipc_socket(
ipc_path: str,
ipc_path: str, read_buffer_limit: int
) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
if sys.platform == "win32":
# On Windows named pipe is used. Simulate socket with it.
Expand All @@ -47,7 +41,7 @@ async def async_get_ipc_socket(

return NamedPipe(ipc_path)
else:
return await asyncio.open_unix_connection(ipc_path)
return await asyncio.open_unix_connection(ipc_path, limit=read_buffer_limit)


class AsyncIPCProvider(PersistentConnectionProvider):
Expand All @@ -56,11 +50,11 @@ class AsyncIPCProvider(PersistentConnectionProvider):
_reader: Optional[asyncio.StreamReader] = None
_writer: Optional[asyncio.StreamWriter] = None
_decoder: json.JSONDecoder = json.JSONDecoder()
_raw_message: str = ""

def __init__(
self,
ipc_path: Optional[Union[str, Path]] = None,
read_buffer_limit: int = 20 * 1024 * 1024, # 20 MB
# `PersistentConnectionProvider` kwargs can be passed through
**kwargs: Any,
) -> None:
Expand All @@ -71,6 +65,7 @@ def __init__(
else:
raise Web3TypeError("ipc_path must be of type string or pathlib.Path")

self.read_buffer_limit = read_buffer_limit
super().__init__(**kwargs)

def __str__(self) -> str:
Expand Down Expand Up @@ -101,18 +96,10 @@ async def socket_send(self, request_data: bytes) -> None:
)

async def socket_recv(self) -> RPCResponse:
while True:
# yield to the event loop to allow other tasks to run
await asyncio.sleep(0)

try:
response, pos = self._decoder.raw_decode(self._raw_message)
self._raw_message = self._raw_message[pos:].lstrip()
return response
except JSONDecodeError:
# read more data from the socket if the current raw message is
# incomplete
self._raw_message += to_text(await self._reader.read(4096)).lstrip()
data = await self._reader.readline()
if not data:
raise PersistentConnectionClosedOK("Socket reader received end of stream.")
return self.decode_rpc_response(data)

# -- private methods -- #

Expand All @@ -131,10 +118,14 @@ async def _socket_send(self, request_data: bytes) -> None:
async def _reset_socket(self) -> None:
self._writer.close()
await self._writer.wait_closed()
self._reader, self._writer = await async_get_ipc_socket(self.ipc_path)
self._reader, self._writer = await async_get_ipc_socket(
self.ipc_path, self.read_buffer_limit
)

async def _provider_specific_connect(self) -> None:
self._reader, self._writer = await async_get_ipc_socket(self.ipc_path)
self._reader, self._writer = await async_get_ipc_socket(
self.ipc_path, self.read_buffer_limit
)

async def _provider_specific_disconnect(self) -> None:
if self._writer and not self._writer.is_closing():
Expand All @@ -149,5 +140,3 @@ async def _provider_specific_socket_reader(self) -> RPCResponse:

def _error_log_listener_task_exception(self, e: Exception) -> None:
super()._error_log_listener_task_exception(e)
# reset the raw message buffer on exception when error logging
self._raw_message = ""

0 comments on commit 517399c

Please sign in to comment.