Skip to content

Commit faa14ed

Browse files
authored
Merge pull request #128 from opentensor/feat/thewhaleking/add-archive-node
Add archive node to retry substrate
2 parents e53f27f + ed0c6d6 commit faa14ed

File tree

5 files changed

+104
-19
lines changed

5 files changed

+104
-19
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
BlockNotFound,
4242
MaxRetriesExceeded,
4343
MetadataAtVersionNotFound,
44+
StateDiscardedError,
4445
)
4546
from async_substrate_interface.protocols import Keypair
4647
from async_substrate_interface.types import (
@@ -2148,6 +2149,7 @@ async def _make_rpc_request(
21482149
storage_item,
21492150
result_handler,
21502151
)
2152+
21512153
request_manager.add_response(
21522154
item_id, decoded_response, complete
21532155
)
@@ -2234,9 +2236,8 @@ async def rpc_request(
22342236
]
22352237
result = await self._make_rpc_request(payloads, result_handler=result_handler)
22362238
if "error" in result[payload_id][0]:
2237-
if (
2238-
"Failed to get runtime version"
2239-
in result[payload_id][0]["error"]["message"]
2239+
if "Failed to get runtime version" in (
2240+
err_msg := result[payload_id][0]["error"]["message"]
22402241
):
22412242
logger.warning(
22422243
"Failed to get runtime. Re-fetching from chain, and retrying."
@@ -2245,7 +2246,14 @@ async def rpc_request(
22452246
return await self.rpc_request(
22462247
method, params, result_handler, block_hash, reuse_block_hash
22472248
)
2248-
raise SubstrateRequestException(result[payload_id][0]["error"]["message"])
2249+
elif (
2250+
"Client error: Api called for an unknown Block: State already discarded"
2251+
in err_msg
2252+
):
2253+
bh = err_msg.split("State already discarded for ")[1].strip()
2254+
raise StateDiscardedError(bh)
2255+
else:
2256+
raise SubstrateRequestException(err_msg)
22492257
if "result" in result[payload_id][0]:
22502258
return result[payload_id][0]
22512259
else:

async_substrate_interface/errors.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ def __init__(self):
2222
super().__init__(message)
2323

2424

25+
class StateDiscardedError(SubstrateRequestException):
26+
def __init__(self, block_hash: str):
27+
self.block_hash = block_hash
28+
message = (
29+
f"State discarded for {block_hash}. This indicates the block is too old, and you should instead "
30+
f"make this request using an archive node."
31+
)
32+
super().__init__(message)
33+
34+
2535
class StorageFunctionNotFound(ValueError):
2636
pass
2737

async_substrate_interface/substrate_addons.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from websockets.exceptions import ConnectionClosed
1414

1515
from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket
16-
from async_substrate_interface.errors import MaxRetriesExceeded
16+
from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError
1717
from async_substrate_interface.sync_substrate import SubstrateInterface
1818

1919
logger = logging.getLogger("async_substrate_interface")
@@ -117,13 +117,17 @@ def __init__(
117117
max_retries: int = 5,
118118
retry_timeout: float = 60.0,
119119
_mock: bool = False,
120+
archive_nodes: Optional[list[str]] = None,
120121
):
121122
fallback_chains = fallback_chains or []
122123
self.fallback_chains = (
123124
iter(fallback_chains)
124125
if not retry_forever
125126
else cycle(fallback_chains + [url])
126127
)
128+
self.archive_nodes = (
129+
iter(archive_nodes) if not retry_forever else cycle(archive_nodes)
130+
)
127131
self.use_remote_preset = use_remote_preset
128132
self.chain_name = chain_name
129133
self._mock = _mock
@@ -174,20 +178,32 @@ def _retry(self, method, *args, **kwargs):
174178
EOFError,
175179
ConnectionClosed,
176180
TimeoutError,
181+
socket.gaierror,
182+
StateDiscardedError,
177183
) as e:
184+
use_archive = isinstance(e, StateDiscardedError)
178185
try:
179-
self._reinstantiate_substrate(e)
186+
self._reinstantiate_substrate(e, use_archive=use_archive)
180187
return method_(*args, **kwargs)
181188
except StopIteration:
182189
logger.error(
183190
f"Max retries exceeded with {self.url}. No more fallback chains."
184191
)
185192
raise MaxRetriesExceeded
186193

187-
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
188-
next_network = next(self.fallback_chains)
194+
def _reinstantiate_substrate(
195+
self, e: Optional[Exception] = None, use_archive: bool = False
196+
) -> None:
197+
if use_archive:
198+
bh = getattr(e, "block_hash", "Unknown Block Hash")
199+
logger.info(
200+
f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node."
201+
)
202+
next_network = next(self.archive_nodes)
203+
else:
204+
next_network = next(self.fallback_chains)
189205
self.ws.close()
190-
if e.__class__ == MaxRetriesExceeded:
206+
if isinstance(e, MaxRetriesExceeded):
191207
logger.error(
192208
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
193209
)
@@ -243,13 +259,17 @@ def __init__(
243259
max_retries: int = 5,
244260
retry_timeout: float = 60.0,
245261
_mock: bool = False,
262+
archive_nodes: Optional[list[str]] = None,
246263
):
247264
fallback_chains = fallback_chains or []
248265
self.fallback_chains = (
249266
iter(fallback_chains)
250267
if not retry_forever
251268
else cycle(fallback_chains + [url])
252269
)
270+
self.archive_nodes = (
271+
iter(archive_nodes) if not retry_forever else cycle(archive_nodes)
272+
)
253273
self.use_remote_preset = use_remote_preset
254274
self.chain_name = chain_name
255275
self._mock = _mock
@@ -272,9 +292,18 @@ def __init__(
272292
for method in RETRY_METHODS:
273293
setattr(self, method, partial(self._retry, method))
274294

275-
async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
276-
next_network = next(self.fallback_chains)
277-
if e.__class__ == MaxRetriesExceeded:
295+
async def _reinstantiate_substrate(
296+
self, e: Optional[Exception] = None, use_archive: bool = False
297+
) -> None:
298+
if use_archive:
299+
bh = getattr(e, "block_hash", "Unknown Block Hash")
300+
logger.info(
301+
f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node."
302+
)
303+
next_network = next(self.archive_nodes)
304+
else:
305+
next_network = next(self.fallback_chains)
306+
if isinstance(e, MaxRetriesExceeded):
278307
logger.error(
279308
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
280309
)
@@ -314,9 +343,11 @@ async def _retry(self, method, *args, **kwargs):
314343
ConnectionClosed,
315344
EOFError,
316345
socket.gaierror,
346+
StateDiscardedError,
317347
) as e:
348+
use_archive = isinstance(e, StateDiscardedError)
318349
try:
319-
await self._reinstantiate_substrate(e)
350+
await self._reinstantiate_substrate(e, use_archive=use_archive)
320351
return await method_(*args, **kwargs)
321352
except StopAsyncIteration:
322353
logger.error(

async_substrate_interface/sync_substrate.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
BlockNotFound,
2525
MaxRetriesExceeded,
2626
MetadataAtVersionNotFound,
27+
StateDiscardedError,
2728
)
2829
from async_substrate_interface.protocols import Keypair
2930
from async_substrate_interface.types import (
@@ -1944,9 +1945,8 @@ def rpc_request(
19441945
]
19451946
result = self._make_rpc_request(payloads, result_handler=result_handler)
19461947
if "error" in result[payload_id][0]:
1947-
if (
1948-
"Failed to get runtime version"
1949-
in result[payload_id][0]["error"]["message"]
1948+
if "Failed to get runtime version" in (
1949+
err_msg := result[payload_id][0]["error"]["message"]
19501950
):
19511951
logger.warning(
19521952
"Failed to get runtime. Re-fetching from chain, and retrying."
@@ -1955,7 +1955,14 @@ def rpc_request(
19551955
return self.rpc_request(
19561956
method, params, result_handler, block_hash, reuse_block_hash
19571957
)
1958-
raise SubstrateRequestException(result[payload_id][0]["error"]["message"])
1958+
elif (
1959+
"Client error: Api called for an unknown Block: State already discarded"
1960+
in err_msg
1961+
):
1962+
bh = err_msg.split("State already discarded for ")[1].strip()
1963+
raise StateDiscardedError(bh)
1964+
else:
1965+
raise SubstrateRequestException(err_msg)
19591966
if "result" in result[payload_id][0]:
19601967
return result[payload_id][0]
19611968
else:

tests/test_substrate_addons.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import pytest
55
import time
66

7-
from async_substrate_interface.substrate_addons import RetrySyncSubstrate
8-
from async_substrate_interface.errors import MaxRetriesExceeded
7+
from async_substrate_interface import AsyncSubstrateInterface, SubstrateInterface
8+
from async_substrate_interface.substrate_addons import (
9+
RetrySyncSubstrate,
10+
RetryAsyncSubstrate,
11+
)
12+
from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError
913
from tests.conftest import start_docker_container
1014

1115
LATENT_LITE_ENTRYPOINT = "wss://lite.sub.latent.to:443"
@@ -70,3 +74,28 @@ def test_retry_sync_substrate_offline():
7074
RetrySyncSubstrate(
7175
"ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"]
7276
)
77+
78+
79+
@pytest.mark.asyncio
80+
async def test_retry_async_subtensor_archive_node():
81+
async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
82+
current_block = await substrate.get_block_number()
83+
old_block = current_block - 1000
84+
with pytest.raises(StateDiscardedError):
85+
await substrate.get_block(block_number=old_block)
86+
async with RetryAsyncSubstrate(
87+
"wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"]
88+
) as substrate:
89+
assert isinstance((await substrate.get_block(block_number=old_block)), dict)
90+
91+
92+
def test_retry_sync_subtensor_archive_node():
93+
with SubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
94+
current_block = substrate.get_block_number()
95+
old_block = current_block - 1000
96+
with pytest.raises(StateDiscardedError):
97+
substrate.get_block(block_number=old_block)
98+
with RetrySyncSubstrate(
99+
"wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"]
100+
) as substrate:
101+
assert isinstance((substrate.get_block(block_number=old_block)), dict)

0 commit comments

Comments
 (0)