Description
- Version:
6.8.0
- Python:
3.9.16
- OS:
Linux 6.1.31-2-MANJARO
What was wrong?
I'm just trying to subscribe to a node using WS and eth_subscribe
to get notifications about new blocks.
Took code from docs.
Modified it a little bit, so came up with:
import asyncio
import time
from typing import cast
from eth_typing import HexStr
from web3 import AsyncWeb3
from web3.providers import WebsocketProviderV2
start = time.time()
async def ws_v2_subscription_context_manager_example():
async with AsyncWeb3.persistent_websocket(
WebsocketProviderV2(
"wss://..."
)
) as w3:
# subscribe to new block headers
# noinspection PyTypeChecker
subscription_id: HexStr = cast(
HexStr, (await w3.eth.subscribe("newHeads"))["result"]
)
print(f"subscribed: {subscription_id}")
unsubscribed = False
while not unsubscribed:
async for response in w3.listen_to_websocket():
try:
print(
"block_number",
int(response["params"]["result"]["number"], 16),
)
except Exception as e:
print("ERROR", str(e))
if time.time() - start > 30:
print("unsubscribing...")
unsubscribed = await w3.eth.unsubscribe(subscription_id)
print("unsubscribed:", unsubscribed)
break
async def main():
await ws_v2_subscription_context_manager_example()
if __name__ == "__main__":
asyncio.run(main())
It crashes:
subscribed: 0xcab36d5d4b328aa605891a693fb01118
block_number 17914393
Traceback (most recent call last):
File "/home/...../test.py", line 48, in <module>
asyncio.run(main())
File "/home/.../.pyenv/versions/3.9.16/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/.../.pyenv/versions/3.9.16/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/...../test.py", line 44, in main
await ws_v2_subscription_context_manager_example()
File "/home/...../test.py", line 27, in ws_v2_subscription_context_manager_example
async for response in w3.listen_to_websocket():
File "/home/..../venv/lib/python3.9/site-packages/web3/manager.py", line 315, in _ws_recv_stream
if request_info.method == "eth_subscribe" and "result" in response.keys():
AttributeError: 'NoneType' object has no attribute 'method'
Process finished with exit code 1
How can it be fixed?
Seems to be in https://github.com/ethereum/web3.py/blob/main/web3/manager.py#L296
async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
...
request_info = self._provider._get_request_information_for_response(response)
if request_info is None:
self.logger.debug("No cache key found for response, returning raw response")
yield response
if request_info.method == "eth_subscribe" and "result" in response.keys():
...
yield apply_result_formatters(result_formatters, partly_formatted_response)
should look like
async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
...
request_info = self._provider._get_request_information_for_response(response)
if request_info is None:
self.logger.debug("No cache key found for response, returning raw response")
yield response
else:
if request_info.method == "eth_subscribe" and "result" in response.keys():
...
yield apply_result_formatters(result_formatters, partly_formatted_response)
To be honest, I dont have a lot of time to invest in this problem 😞.
But self._provider._get_request_information_for_response
tries to look for some cache by md5hash(subscription_id)
. I haven't digged in this caching mechanism and why is it needed, but seems to be if request_info is None
— result should be yielded right away without something like applying of result_formatters and re-yielding after returning of scope to generator.
btw, after this dirty fix, at least for me, code works as expected 😄
Also, there is another minor issues https://github.com/ethereum/web3.py/blob/main/web3/eth/async_eth.py#L680 async def subscribe(...) -> HexStr
returns full-response (dict
), but not HexStr
as expected to be subscription_id
.