Skip to content

Release/1.2.2 #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 22, 2025
Prev Previous commit
Next Next commit
We occasionally get incorrect timeouts with AsyncSubstrateInterface, …
…and this is largely due to the calculation of time — `loop.time()` is not necessarily `time.time()`
  • Loading branch information
thewhaleking committed May 22, 2025
commit f9c6228d87ed95d35f61e364611e0714d30a13a3
18 changes: 15 additions & 3 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,21 @@ def __init__(
self._open_subscriptions = 0
self._options = options if options else {}
self.last_received = time.time()
self.last_sent = time.time()

async def __aenter__(self):
async with self._lock:
self._in_use += 1
await self.connect()
now = asyncio.get_running_loop().time()
self.last_received = now
self.last_sent = now
return self

@staticmethod
async def loop_time() -> float:
return asyncio.get_running_loop().time()

async def connect(self, force=False):
if self._exit_task:
self._exit_task.cancel()
Expand Down Expand Up @@ -594,7 +602,7 @@ async def _recv(self) -> None:
try:
# TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic
response = json.loads(await self.ws.recv(decode=False))
self.last_received = time.time()
self.last_received = await self.loop_time()
async with self._lock:
# note that these 'subscriptions' are all waiting sent messages which have not received
# responses, and thus are not the same as RPC 'subscriptions', which are unique
Expand Down Expand Up @@ -630,12 +638,12 @@ async def send(self, payload: dict) -> int:
Returns:
id: the internal ID of the request (incremented int)
"""
# async with self._lock:
original_id = get_next_id()
# self._open_subscriptions += 1
await self.max_subscriptions.acquire()
try:
await self.ws.send(json.dumps({**payload, **{"id": original_id}}))
self.last_sent = await self.loop_time()
return original_id
except (ConnectionClosed, ssl.SSLError, EOFError):
async with self._lock:
Expand Down Expand Up @@ -2120,7 +2128,11 @@ async def _make_rpc_request(

if request_manager.is_complete:
break
if time.time() - self.ws.last_received >= self.retry_timeout:
if (
(current_time := await self.ws.loop_time()) - self.ws.last_received
>= self.retry_timeout
and current_time - self.ws.last_sent >= self.retry_timeout
):
if attempt >= self.max_retries:
logger.warning(
f"Timed out waiting for RPC requests {attempt} times. Exiting."
Expand Down
Loading