Skip to content

Commit c009d95

Browse files
authored
Merge pull request #100 from opentensor/feat/thewhaleking/fallback-chains
Fallback chains
2 parents fab6fc9 + c9a13ff commit c009d95

File tree

5 files changed

+443
-13
lines changed

5 files changed

+443
-13
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -735,15 +735,14 @@ async def initialize(self):
735735
"""
736736
Initialize the connection to the chain.
737737
"""
738-
async with self._lock:
739-
self._initializing = True
740-
if not self.initialized:
741-
if not self._chain:
742-
chain = await self.rpc_request("system_chain", [])
743-
self._chain = chain.get("result")
744-
await self.init_runtime()
745-
self.initialized = True
746-
self._initializing = False
738+
self._initializing = True
739+
if not self.initialized:
740+
if not self._chain:
741+
chain = await self.rpc_request("system_chain", [])
742+
self._chain = chain.get("result")
743+
await self.init_runtime()
744+
self.initialized = True
745+
self._initializing = False
747746

748747
async def __aexit__(self, exc_type, exc_val, exc_tb):
749748
pass
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
"""
2+
A number of "plugins" for SubstrateInterface (and AsyncSubstrateInterface). At initial creation, it contains only
3+
Retry (sync and async versions).
4+
"""
5+
6+
import asyncio
7+
import logging
8+
import socket
9+
from functools import partial
10+
from itertools import cycle
11+
from typing import Optional
12+
13+
from websockets.exceptions import ConnectionClosed
14+
15+
from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket
16+
from async_substrate_interface.errors import MaxRetriesExceeded
17+
from async_substrate_interface.sync_substrate import SubstrateInterface
18+
19+
logger = logging.getLogger("async_substrate_interface")
20+
21+
22+
RETRY_METHODS = [
23+
"_get_block_handler",
24+
"close",
25+
"compose_call",
26+
"create_scale_object",
27+
"create_signed_extrinsic",
28+
"create_storage_key",
29+
"decode_scale",
30+
"encode_scale",
31+
"generate_signature_payload",
32+
"get_account_next_index",
33+
"get_account_nonce",
34+
"get_block",
35+
"get_block_hash",
36+
"get_block_header",
37+
"get_block_metadata",
38+
"get_block_number",
39+
"get_block_runtime_info",
40+
"get_block_runtime_version_for",
41+
"get_chain_finalised_head",
42+
"get_chain_head",
43+
"get_constant",
44+
"get_events",
45+
"get_extrinsics",
46+
"get_metadata_call_function",
47+
"get_metadata_constant",
48+
"get_metadata_error",
49+
"get_metadata_errors",
50+
"get_metadata_module",
51+
"get_metadata_modules",
52+
"get_metadata_runtime_call_function",
53+
"get_metadata_runtime_call_functions",
54+
"get_metadata_storage_function",
55+
"get_metadata_storage_functions",
56+
"get_parent_block_hash",
57+
"get_payment_info",
58+
"get_storage_item",
59+
"get_type_definition",
60+
"get_type_registry",
61+
"init_runtime",
62+
"initialize",
63+
"query",
64+
"query_map",
65+
"query_multi",
66+
"query_multiple",
67+
"retrieve_extrinsic_by_identifier",
68+
"rpc_request",
69+
"runtime_call",
70+
"submit_extrinsic",
71+
"subscribe_block_headers",
72+
"supports_rpc_method",
73+
]
74+
75+
RETRY_PROPS = ["properties", "version", "token_decimals", "token_symbol", "name"]
76+
77+
78+
class RetrySyncSubstrate(SubstrateInterface):
79+
"""
80+
A subclass of SubstrateInterface that allows for handling chain failures by using backup chains. If a sustained
81+
network failure is encountered on a chain endpoint, the object will initialize a new connection on the next chain in
82+
the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain in `fallback_chains`,
83+
the connection will attempt to iterate over the list (starting with `url`) again.
84+
85+
E.g.
86+
```
87+
substrate = RetrySyncSubstrate(
88+
"wss://entrypoint-finney.opentensor.ai:443",
89+
fallback_chains=["ws://127.0.0.1:9946"]
90+
)
91+
```
92+
In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
93+
also fails, a `MaxRetriesExceeded` exception will be raised.
94+
95+
```
96+
substrate = RetrySyncSubstrate(
97+
"wss://entrypoint-finney.opentensor.ai:443",
98+
fallback_chains=["ws://127.0.0.1:9946"],
99+
retry_forever=True
100+
)
101+
```
102+
In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
103+
the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
104+
so forth.
105+
"""
106+
107+
def __init__(
108+
self,
109+
url: str,
110+
use_remote_preset: bool = False,
111+
fallback_chains: Optional[list[str]] = None,
112+
retry_forever: bool = False,
113+
ss58_format: Optional[int] = None,
114+
type_registry: Optional[dict] = None,
115+
type_registry_preset: Optional[str] = None,
116+
chain_name: str = "",
117+
max_retries: int = 5,
118+
retry_timeout: float = 60.0,
119+
_mock: bool = False,
120+
):
121+
fallback_chains = fallback_chains or []
122+
self.fallback_chains = (
123+
iter(fallback_chains)
124+
if not retry_forever
125+
else cycle(fallback_chains + [url])
126+
)
127+
self.use_remote_preset = use_remote_preset
128+
self.chain_name = chain_name
129+
self._mock = _mock
130+
self.retry_timeout = retry_timeout
131+
self.max_retries = max_retries
132+
self.chain_endpoint = url
133+
self.url = url
134+
initialized = False
135+
for chain_url in [url] + fallback_chains:
136+
try:
137+
self.chain_endpoint = chain_url
138+
self.url = chain_url
139+
super().__init__(
140+
url=chain_url,
141+
ss58_format=ss58_format,
142+
type_registry=type_registry,
143+
use_remote_preset=use_remote_preset,
144+
type_registry_preset=type_registry_preset,
145+
chain_name=chain_name,
146+
_mock=_mock,
147+
retry_timeout=retry_timeout,
148+
max_retries=max_retries,
149+
)
150+
initialized = True
151+
logger.info(f"Connected to {chain_url}")
152+
break
153+
except ConnectionError:
154+
logger.warning(f"Unable to connect to {chain_url}")
155+
if not initialized:
156+
raise ConnectionError(
157+
f"Unable to connect at any chains specified: {[url] + fallback_chains}"
158+
)
159+
# "connect" is only used by SubstrateInterface, not AsyncSubstrateInterface
160+
retry_methods = ["connect"] + RETRY_METHODS
161+
self._original_methods = {
162+
method: getattr(self, method) for method in retry_methods
163+
}
164+
for method in retry_methods:
165+
setattr(self, method, partial(self._retry, method))
166+
167+
def _retry(self, method, *args, **kwargs):
168+
method_ = self._original_methods[method]
169+
try:
170+
return method_(*args, **kwargs)
171+
except (
172+
MaxRetriesExceeded,
173+
ConnectionError,
174+
EOFError,
175+
ConnectionClosed,
176+
TimeoutError,
177+
) as e:
178+
try:
179+
self._reinstantiate_substrate(e)
180+
return method_(*args, **kwargs)
181+
except StopIteration:
182+
logger.error(
183+
f"Max retries exceeded with {self.url}. No more fallback chains."
184+
)
185+
raise MaxRetriesExceeded
186+
187+
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
188+
next_network = next(self.fallback_chains)
189+
self.ws.close()
190+
if e.__class__ == MaxRetriesExceeded:
191+
logger.error(
192+
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
193+
)
194+
else:
195+
logger.error(f"Connection error. Trying again with {next_network}")
196+
self.url = next_network
197+
self.chain_endpoint = next_network
198+
self.initialized = False
199+
self.ws = self.connect(init=True)
200+
if not self._mock:
201+
self.initialize()
202+
203+
204+
class RetryAsyncSubstrate(AsyncSubstrateInterface):
205+
"""
206+
A subclass of AsyncSubstrateInterface that allows for handling chain failures by using backup chains. If a
207+
sustained network failure is encountered on a chain endpoint, the object will initialize a new connection on
208+
the next chain in the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain
209+
in `fallback_chains`, the connection will attempt to iterate over the list (starting with `url`) again.
210+
211+
E.g.
212+
```
213+
substrate = RetryAsyncSubstrate(
214+
"wss://entrypoint-finney.opentensor.ai:443",
215+
fallback_chains=["ws://127.0.0.1:9946"]
216+
)
217+
```
218+
In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
219+
also fails, a `MaxRetriesExceeded` exception will be raised.
220+
221+
```
222+
substrate = RetryAsyncSubstrate(
223+
"wss://entrypoint-finney.opentensor.ai:443",
224+
fallback_chains=["ws://127.0.0.1:9946"],
225+
retry_forever=True
226+
)
227+
```
228+
In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
229+
the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
230+
so forth.
231+
"""
232+
233+
def __init__(
234+
self,
235+
url: str,
236+
use_remote_preset: bool = False,
237+
fallback_chains: Optional[list[str]] = None,
238+
retry_forever: bool = False,
239+
ss58_format: Optional[int] = None,
240+
type_registry: Optional[dict] = None,
241+
type_registry_preset: Optional[str] = None,
242+
chain_name: str = "",
243+
max_retries: int = 5,
244+
retry_timeout: float = 60.0,
245+
_mock: bool = False,
246+
):
247+
fallback_chains = fallback_chains or []
248+
self.fallback_chains = (
249+
iter(fallback_chains)
250+
if not retry_forever
251+
else cycle(fallback_chains + [url])
252+
)
253+
self.use_remote_preset = use_remote_preset
254+
self.chain_name = chain_name
255+
self._mock = _mock
256+
self.retry_timeout = retry_timeout
257+
self.max_retries = max_retries
258+
super().__init__(
259+
url=url,
260+
ss58_format=ss58_format,
261+
type_registry=type_registry,
262+
use_remote_preset=use_remote_preset,
263+
type_registry_preset=type_registry_preset,
264+
chain_name=chain_name,
265+
_mock=_mock,
266+
retry_timeout=retry_timeout,
267+
max_retries=max_retries,
268+
)
269+
self._original_methods = {
270+
method: getattr(self, method) for method in RETRY_METHODS
271+
}
272+
for method in RETRY_METHODS:
273+
setattr(self, method, partial(self._retry, method))
274+
275+
async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
276+
next_network = next(self.fallback_chains)
277+
if e.__class__ == MaxRetriesExceeded:
278+
logger.error(
279+
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
280+
)
281+
else:
282+
logger.error(f"Connection error. Trying again with {next_network}")
283+
try:
284+
await self.ws.shutdown()
285+
except AttributeError:
286+
pass
287+
if self._forgettable_task is not None:
288+
self._forgettable_task: asyncio.Task
289+
self._forgettable_task.cancel()
290+
try:
291+
await self._forgettable_task
292+
except asyncio.CancelledError:
293+
pass
294+
self.chain_endpoint = next_network
295+
self.url = next_network
296+
self.ws = Websocket(
297+
next_network,
298+
options={
299+
"max_size": self.ws_max_size,
300+
"write_limit": 2**16,
301+
},
302+
)
303+
self._initialized = False
304+
self._initializing = False
305+
await self.initialize()
306+
307+
async def _retry(self, method, *args, **kwargs):
308+
method_ = self._original_methods[method]
309+
try:
310+
return await method_(*args, **kwargs)
311+
except (
312+
MaxRetriesExceeded,
313+
ConnectionError,
314+
ConnectionClosed,
315+
EOFError,
316+
socket.gaierror,
317+
) as e:
318+
try:
319+
await self._reinstantiate_substrate(e)
320+
return await method_(*args, **kwargs)
321+
except StopAsyncIteration:
322+
logger.error(
323+
f"Max retries exceeded with {self.url}. No more fallback chains."
324+
)
325+
raise MaxRetriesExceeded

async_substrate_interface/sync_substrate.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import logging
3+
import socket
34
from hashlib import blake2b
45
from typing import Optional, Union, Callable, Any
56

@@ -512,7 +513,6 @@ def __init__(
512513
"strict_scale_decode": True,
513514
}
514515
self.initialized = False
515-
self._forgettable_task = None
516516
self.ss58_format = ss58_format
517517
self.type_registry = type_registry
518518
self.type_registry_preset = type_registry_preset
@@ -588,13 +588,19 @@ def name(self):
588588

589589
def connect(self, init=False):
590590
if init is True:
591-
return connect(self.chain_endpoint, max_size=self.ws_max_size)
591+
try:
592+
return connect(self.chain_endpoint, max_size=self.ws_max_size)
593+
except (ConnectionError, socket.gaierror) as e:
594+
raise ConnectionError(e)
592595
else:
593596
if not self.ws.close_code:
594597
return self.ws
595598
else:
596-
self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size)
597-
return self.ws
599+
try:
600+
self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size)
601+
return self.ws
602+
except (ConnectionError, socket.gaierror) as e:
603+
raise ConnectionError(e)
598604

599605
def get_storage_item(
600606
self, module: str, storage_function: str, block_hash: str = None

0 commit comments

Comments
 (0)