From fc63792033bd830ac3f4ec9e92a342f19243af94 Mon Sep 17 00:00:00 2001 From: voith Date: Wed, 17 Oct 2018 08:47:57 +0530 Subject: [PATCH 1/4] added timeout for WebsocketProvider wait_for_ws change localhost to 127.0.0.1 --- tests/conftest.py | 9 ++++ .../core/providers/test_websocket_provider.py | 47 +++++++++++++++++++ tests/utils.py | 25 ++++++++++ web3/providers/websocket.py | 16 ++++++- 4 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 tests/utils.py diff --git a/tests/conftest.py b/tests/conftest.py index 7cf42a0d00..9157010672 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,16 @@ identity, ) +from .utils import ( + get_open_port, +) + @pytest.fixture(scope="module", params=[lambda x: to_bytes(hexstr=x), identity]) def address_conversion_func(request): return request.param + + +@pytest.fixture() +def open_port(): + return get_open_port() diff --git a/tests/core/providers/test_websocket_provider.py b/tests/core/providers/test_websocket_provider.py index f338508e09..64d2f5da17 100644 --- a/tests/core/providers/test_websocket_provider.py +++ b/tests/core/providers/test_websocket_provider.py @@ -1,5 +1,18 @@ +import asyncio +from concurrent.futures import ( + TimeoutError, +) import pytest +from threading import ( + Thread, +) + +import websockets +from tests.utils import ( + wait_for_ws +) +from web3 import Web3 from web3.exceptions import ( ValidationError, ) @@ -8,6 +21,40 @@ ) +@pytest.yield_fixture +def start_websocket_server(open_port): + event_loop = asyncio.new_event_loop() + + def run_server(): + async def empty_server(websocket, path): + data = await websocket.recv() + await asyncio.sleep(0.1) + await websocket.send(data) + server = websockets.serve(empty_server, '127.0.0.1', open_port, loop=event_loop) + event_loop.run_until_complete(server) + event_loop.run_forever() + + thd = Thread(target=run_server) + thd.start() + yield + event_loop.call_soon_threadsafe(event_loop.stop) + + +@pytest.fixture() +def w3(open_port, start_websocket_server): + # need new event loop as the one used by server is already running + event_loop = asyncio.new_event_loop() + endpoint_uri = 'ws://127.0.0.1:{}'.format(open_port) + event_loop.run_until_complete(wait_for_ws(endpoint_uri, event_loop)) + provider = WebsocketProvider(endpoint_uri, websocket_timeout=0) + return Web3(provider) + + +def test_websocket_provider_timeout(w3): + with pytest.raises(TimeoutError): + w3.eth.accounts + + def test_restricted_websocket_kwargs(): invalid_kwargs = {'uri': 'ws://127.0.0.1:8546'} re_exc_message = r'.*found: {0}*'.format(set(invalid_kwargs.keys())) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000000..05f2899621 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,25 @@ +import asyncio +import socket +import time + +import websockets + + +def get_open_port(): + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + port = sock.getsockname()[1] + sock.close() + return str(port) + + +async def wait_for_ws(endpoint_uri, event_loop, timeout=60): + start = time.time() + while time.time() < start + timeout: + try: + async with websockets.connect(uri=endpoint_uri, loop=event_loop): + pass + except (ConnectionRefusedError, OSError): + await asyncio.sleep(0.01) + else: + break diff --git a/web3/providers/websocket.py b/web3/providers/websocket.py index 91a2a9df97..dfd108f5e4 100644 --- a/web3/providers/websocket.py +++ b/web3/providers/websocket.py @@ -16,6 +16,7 @@ ) RESTRICTED_WEBSOCKET_KWARGS = {'uri', 'loop'} +DEFAULT_WEBSOCKET_TIMEOUT = 10 def _start_event_loop(loop): @@ -63,8 +64,14 @@ class WebsocketProvider(JSONBaseProvider): logger = logging.getLogger("web3.providers.WebsocketProvider") _loop = None - def __init__(self, endpoint_uri=None, websocket_kwargs=None): + def __init__( + self, + endpoint_uri=None, + websocket_kwargs=None, + websocket_timeout=DEFAULT_WEBSOCKET_TIMEOUT + ): self.endpoint_uri = endpoint_uri + self.websocket_timeout = websocket_timeout if self.endpoint_uri is None: self.endpoint_uri = get_default_endpoint() if WebsocketProvider._loop is None: @@ -91,7 +98,12 @@ def __str__(self): async def coro_make_request(self, request_data): async with self.conn as conn: await conn.send(request_data) - return json.loads(await conn.recv()) + return json.loads( + await asyncio.wait_for( + conn.recv(), + timeout=self.websocket_timeout + ) + ) def make_request(self, method, params): self.logger.debug("Making request WebSocket. URI: %s, " From e5fa7c92e89ae916cf7cbee4eea86dcec3f959ea Mon Sep 17 00:00:00 2001 From: voith Date: Wed, 17 Oct 2018 08:49:02 +0530 Subject: [PATCH 2/4] moved get_open_port to tests/utils.py --- tests/generate_go_ethereum_fixture.py | 11 +++-------- tests/integration/generate_fixtures/common.py | 8 -------- tests/integration/generate_fixtures/go_ethereum.py | 5 ++++- tests/integration/generate_fixtures/parity.py | 7 +++++-- tests/integration/go_ethereum/common.py | 9 --------- tests/integration/go_ethereum/test_goethereum_http.py | 4 +++- tests/integration/go_ethereum/test_goethereum_ipc.py | 4 +++- tests/integration/go_ethereum/test_goethereum_ws.py | 4 +++- tests/integration/go_ethereum/utils.py | 8 -------- tests/integration/parity/common.py | 9 --------- tests/integration/parity/test_parity_http.py | 4 +++- tests/integration/parity/test_parity_ws.py | 4 +++- tests/integration/parity/utils.py | 8 -------- 13 files changed, 27 insertions(+), 58 deletions(-) diff --git a/tests/generate_go_ethereum_fixture.py b/tests/generate_go_ethereum_fixture.py index dc853a1e19..958b0eb93f 100644 --- a/tests/generate_go_ethereum_fixture.py +++ b/tests/generate_go_ethereum_fixture.py @@ -22,6 +22,9 @@ to_wei, ) +from tests.utils import ( + get_open_port, +) from web3 import Web3 from web3.utils.module_testing.emitter_contract import ( EMITTER_ABI, @@ -100,14 +103,6 @@ def tempdir(): shutil.rmtree(dir_path) -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) - - def get_geth_binary(): from geth.install import ( get_executable_path, diff --git a/tests/integration/generate_fixtures/common.py b/tests/integration/generate_fixtures/common.py index 895eb451d5..7d305e87b2 100644 --- a/tests/integration/generate_fixtures/common.py +++ b/tests/integration/generate_fixtures/common.py @@ -76,14 +76,6 @@ def tempdir(): shutil.rmtree(dir_path) -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) - - def get_geth_binary(): from geth.install import ( get_executable_path, diff --git a/tests/integration/generate_fixtures/go_ethereum.py b/tests/integration/generate_fixtures/go_ethereum.py index ca89bc5610..75a6e19417 100644 --- a/tests/integration/generate_fixtures/go_ethereum.py +++ b/tests/integration/generate_fixtures/go_ethereum.py @@ -11,6 +11,9 @@ ) import common +from tests.utils import ( + get_open_port, +) from web3 import Web3 from web3.utils.module_testing.emitter_contract import ( EMITTER_ABI, @@ -42,7 +45,7 @@ def generate_go_ethereum_fixture(destination_dir): geth_ipc_path_dir = stack.enter_context(common.tempdir()) geth_ipc_path = os.path.join(geth_ipc_path_dir, 'geth.ipc') - geth_port = common.get_open_port() + geth_port = get_open_port() geth_binary = common.get_geth_binary() geth_proc = stack.enter_context(common.get_geth_process( # noqa: F841 diff --git a/tests/integration/generate_fixtures/parity.py b/tests/integration/generate_fixtures/parity.py index 456eec358c..8b58c38c34 100644 --- a/tests/integration/generate_fixtures/parity.py +++ b/tests/integration/generate_fixtures/parity.py @@ -12,6 +12,9 @@ import common import go_ethereum +from tests.utils import ( + get_open_port, +) from web3 import Web3 from web3.utils.toolz import ( merge, @@ -176,7 +179,7 @@ def generate_parity_fixture(destination_dir): geth_datadir = stack.enter_context(common.tempdir()) - geth_port = common.get_open_port() + geth_port = get_open_port() geth_ipc_path_dir = stack.enter_context(common.tempdir()) geth_ipc_path = os.path.join(geth_ipc_path_dir, 'geth.ipc') @@ -221,7 +224,7 @@ def generate_parity_fixture(destination_dir): parity_ipc_path_dir = stack.enter_context(common.tempdir()) parity_ipc_path = os.path.join(parity_ipc_path_dir, 'jsonrpc.ipc') - parity_port = common.get_open_port() + parity_port = get_open_port() parity_binary = get_parity_binary() parity_proc = stack.enter_context(get_parity_process( # noqa: F841 diff --git a/tests/integration/go_ethereum/common.py b/tests/integration/go_ethereum/common.py index 4f27dd1243..bd7b691abf 100644 --- a/tests/integration/go_ethereum/common.py +++ b/tests/integration/go_ethereum/common.py @@ -1,5 +1,4 @@ import pytest -import socket from web3.utils.module_testing import ( EthModuleTest, @@ -10,14 +9,6 @@ ) -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) - - class GoEthereumTest(Web3ModuleTest): def _check_web3_clientVersion(self, client_version): assert client_version.startswith('Geth/') diff --git a/tests/integration/go_ethereum/test_goethereum_http.py b/tests/integration/go_ethereum/test_goethereum_http.py index 0e196e8da1..251e558ead 100644 --- a/tests/integration/go_ethereum/test_goethereum_http.py +++ b/tests/integration/go_ethereum/test_goethereum_http.py @@ -1,5 +1,8 @@ import pytest +from tests.utils import ( + get_open_port, +) from web3 import Web3 from .common import ( @@ -8,7 +11,6 @@ GoEthereumPersonalModuleTest, GoEthereumTest, GoEthereumVersionModuleTest, - get_open_port, ) from .utils import ( wait_for_http, diff --git a/tests/integration/go_ethereum/test_goethereum_ipc.py b/tests/integration/go_ethereum/test_goethereum_ipc.py index 0e504a991c..8bc53ff6f9 100644 --- a/tests/integration/go_ethereum/test_goethereum_ipc.py +++ b/tests/integration/go_ethereum/test_goethereum_ipc.py @@ -2,6 +2,9 @@ import pytest import tempfile +from tests.utils import ( + get_open_port, +) from web3 import Web3 from .common import ( @@ -12,7 +15,6 @@ GoEthereumVersionModuleTest, ) from .utils import ( - get_open_port, wait_for_socket, ) diff --git a/tests/integration/go_ethereum/test_goethereum_ws.py b/tests/integration/go_ethereum/test_goethereum_ws.py index 4499dcaf64..941441b9e9 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws.py +++ b/tests/integration/go_ethereum/test_goethereum_ws.py @@ -6,6 +6,9 @@ from tests.integration.utils import ( wait_for_ws, ) +from tests.utils import ( + get_open_port, +) from web3 import Web3 from .common import ( @@ -14,7 +17,6 @@ GoEthereumPersonalModuleTest, GoEthereumTest, GoEthereumVersionModuleTest, - get_open_port, ) diff --git a/tests/integration/go_ethereum/utils.py b/tests/integration/go_ethereum/utils.py index 0d867d3c28..b9c1176038 100644 --- a/tests/integration/go_ethereum/utils.py +++ b/tests/integration/go_ethereum/utils.py @@ -50,11 +50,3 @@ def kill_proc_gracefully(proc): if proc.poll() is None: proc.kill() wait_for_popen(proc, 2) - - -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) diff --git a/tests/integration/parity/common.py b/tests/integration/parity/common.py index d944d9b7c8..035e8c5f45 100644 --- a/tests/integration/parity/common.py +++ b/tests/integration/parity/common.py @@ -1,5 +1,4 @@ import pytest -import socket from flaky import ( flaky, @@ -16,14 +15,6 @@ MAX_FLAKY_RUNS = 3 -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) - - class ParityWeb3ModuleTest(Web3ModuleTest): def _check_web3_clientVersion(self, client_version): assert client_version.startswith('Parity/') diff --git a/tests/integration/parity/test_parity_http.py b/tests/integration/parity/test_parity_http.py index ceb5e1c5b4..a5381070e8 100644 --- a/tests/integration/parity/test_parity_http.py +++ b/tests/integration/parity/test_parity_http.py @@ -4,6 +4,9 @@ from tests.integration.parity.utils import ( wait_for_http, ) +from tests.utils import ( + get_open_port, +) from web3 import Web3 from web3.utils.module_testing import ( NetModuleTest, @@ -15,7 +18,6 @@ ParityPersonalModuleTest, ParityTraceModuleTest, ParityWeb3ModuleTest, - get_open_port, ) diff --git a/tests/integration/parity/test_parity_ws.py b/tests/integration/parity/test_parity_ws.py index 3e8bafa7c5..5f328cf1d3 100644 --- a/tests/integration/parity/test_parity_ws.py +++ b/tests/integration/parity/test_parity_ws.py @@ -7,6 +7,9 @@ from tests.integration.utils import ( wait_for_ws, ) +from tests.utils import ( + get_open_port, +) from web3 import Web3 from web3.utils.module_testing import ( NetModuleTest, @@ -18,7 +21,6 @@ ParityPersonalModuleTest, ParityTraceModuleTest, ParityWeb3ModuleTest, - get_open_port, ) diff --git a/tests/integration/parity/utils.py b/tests/integration/parity/utils.py index d0061040d4..0403533c58 100644 --- a/tests/integration/parity/utils.py +++ b/tests/integration/parity/utils.py @@ -80,11 +80,3 @@ def kill_proc_gracefully(proc): if proc.poll() is None: proc.kill() wait_for_popen(proc, 2) - - -def get_open_port(): - sock = socket.socket() - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - sock.close() - return str(port) From 1f5a4b82a862b3d75e000859e8ac19b077884c79 Mon Sep 17 00:00:00 2001 From: voith Date: Wed, 17 Oct 2018 10:19:47 +0530 Subject: [PATCH 3/4] moved wait_for_ws to tests/utils.py --- tests/core/providers/test_websocket_provider.py | 2 +- .../go_ethereum/test_goethereum_ws.py | 4 +--- tests/integration/parity/test_parity_ws.py | 4 +--- tests/integration/utils.py | 16 ---------------- 4 files changed, 3 insertions(+), 23 deletions(-) delete mode 100644 tests/integration/utils.py diff --git a/tests/core/providers/test_websocket_provider.py b/tests/core/providers/test_websocket_provider.py index 64d2f5da17..f3511c8a87 100644 --- a/tests/core/providers/test_websocket_provider.py +++ b/tests/core/providers/test_websocket_provider.py @@ -10,7 +10,7 @@ import websockets from tests.utils import ( - wait_for_ws + wait_for_ws, ) from web3 import Web3 from web3.exceptions import ( diff --git a/tests/integration/go_ethereum/test_goethereum_ws.py b/tests/integration/go_ethereum/test_goethereum_ws.py index 941441b9e9..5166bae242 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws.py +++ b/tests/integration/go_ethereum/test_goethereum_ws.py @@ -3,11 +3,9 @@ from tests.integration.common import ( MiscWebsocketTest, ) -from tests.integration.utils import ( - wait_for_ws, -) from tests.utils import ( get_open_port, + wait_for_ws, ) from web3 import Web3 diff --git a/tests/integration/parity/test_parity_ws.py b/tests/integration/parity/test_parity_ws.py index 5f328cf1d3..db7acd0067 100644 --- a/tests/integration/parity/test_parity_ws.py +++ b/tests/integration/parity/test_parity_ws.py @@ -4,11 +4,9 @@ from tests.integration.common import ( MiscWebsocketTest, ) -from tests.integration.utils import ( - wait_for_ws, -) from tests.utils import ( get_open_port, + wait_for_ws, ) from web3 import Web3 from web3.utils.module_testing import ( diff --git a/tests/integration/utils.py b/tests/integration/utils.py deleted file mode 100644 index fdf1c930ae..0000000000 --- a/tests/integration/utils.py +++ /dev/null @@ -1,16 +0,0 @@ -import asyncio -import time - -import websockets - - -async def wait_for_ws(endpoint_uri, event_loop, timeout=60): - start = time.time() - while time.time() < start + timeout: - try: - async with websockets.connect(uri=endpoint_uri, loop=event_loop): - pass - except (ConnectionRefusedError, OSError): - await asyncio.sleep(0.01) - else: - break From bcde527798b590b599322cb8e812da270308f851 Mon Sep 17 00:00:00 2001 From: voith Date: Thu, 18 Oct 2018 00:45:49 +0530 Subject: [PATCH 4/4] reduced sleep duration and added timeout for send --- tests/core/providers/test_websocket_provider.py | 10 ++++++---- web3/providers/websocket.py | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/core/providers/test_websocket_provider.py b/tests/core/providers/test_websocket_provider.py index f3511c8a87..789fbe2225 100644 --- a/tests/core/providers/test_websocket_provider.py +++ b/tests/core/providers/test_websocket_provider.py @@ -28,7 +28,7 @@ def start_websocket_server(open_port): def run_server(): async def empty_server(websocket, path): data = await websocket.recv() - await asyncio.sleep(0.1) + await asyncio.sleep(0.02) await websocket.send(data) server = websockets.serve(empty_server, '127.0.0.1', open_port, loop=event_loop) event_loop.run_until_complete(server) @@ -36,8 +36,10 @@ async def empty_server(websocket, path): thd = Thread(target=run_server) thd.start() - yield - event_loop.call_soon_threadsafe(event_loop.stop) + try: + yield + finally: + event_loop.call_soon_threadsafe(event_loop.stop) @pytest.fixture() @@ -46,7 +48,7 @@ def w3(open_port, start_websocket_server): event_loop = asyncio.new_event_loop() endpoint_uri = 'ws://127.0.0.1:{}'.format(open_port) event_loop.run_until_complete(wait_for_ws(endpoint_uri, event_loop)) - provider = WebsocketProvider(endpoint_uri, websocket_timeout=0) + provider = WebsocketProvider(endpoint_uri, websocket_timeout=0.01) return Web3(provider) diff --git a/web3/providers/websocket.py b/web3/providers/websocket.py index dfd108f5e4..a5df377e38 100644 --- a/web3/providers/websocket.py +++ b/web3/providers/websocket.py @@ -97,7 +97,10 @@ def __str__(self): async def coro_make_request(self, request_data): async with self.conn as conn: - await conn.send(request_data) + await asyncio.wait_for( + conn.send(request_data), + timeout=self.websocket_timeout + ) return json.loads( await asyncio.wait_for( conn.recv(),