Skip to content

Commit 05ddb4e

Browse files
committed
Scrapped the multiprocessing idea. Now using decode_list from bt-decode
1 parent c13dc44 commit 05ddb4e

File tree

4 files changed

+110
-163
lines changed

4 files changed

+110
-163
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,12 @@
5656
)
5757
from async_substrate_interface.utils.storage import StorageKey
5858
from async_substrate_interface.type_registry import _TYPE_REGISTRY
59-
from async_substrate_interface.utils.decoding_attempt import (
59+
from async_substrate_interface.utils.decoding import (
6060
decode_query_map,
61-
_decode_scale_with_runtime,
6261
)
6362

6463
if TYPE_CHECKING:
6564
from websockets.asyncio.client import ClientConnection
66-
from concurrent.futures import ProcessPoolExecutor
6765

6866
ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]]
6967

@@ -418,7 +416,6 @@ def __init__(
418416
last_key: Optional[str] = None,
419417
max_results: Optional[int] = None,
420418
ignore_decoding_errors: bool = False,
421-
executor: Optional["ProcessPoolExecutor"] = None,
422419
):
423420
self.records = records
424421
self.page_size = page_size
@@ -431,7 +428,6 @@ def __init__(
431428
self.params = params
432429
self.ignore_decoding_errors = ignore_decoding_errors
433430
self.loading_complete = False
434-
self.executor = executor
435431
self._buffer = iter(self.records) # Initialize the buffer with initial records
436432

437433
async def retrieve_next_page(self, start_key) -> list:
@@ -444,7 +440,6 @@ async def retrieve_next_page(self, start_key) -> list:
444440
start_key=start_key,
445441
max_results=self.max_results,
446442
ignore_decoding_errors=self.ignore_decoding_errors,
447-
executor=self.executor,
448443
)
449444
if len(result.records) < self.page_size:
450445
self.loading_complete = True
@@ -2867,7 +2862,6 @@ async def query_map(
28672862
page_size: int = 100,
28682863
ignore_decoding_errors: bool = False,
28692864
reuse_block_hash: bool = False,
2870-
executor: Optional["ProcessPoolExecutor"] = None,
28712865
) -> AsyncQueryMapResult:
28722866
"""
28732867
Iterates over all key-pairs located at the given module and storage_function. The storage
@@ -2972,54 +2966,16 @@ async def query_map(
29722966
if "error" in response:
29732967
raise SubstrateRequestException(response["error"]["message"])
29742968
for result_group in response["result"]:
2975-
if executor:
2976-
# print(
2977-
# ("prefix", type("prefix")),
2978-
# ("runtime_registry", type(runtime.registry)),
2979-
# ("param_types", type(param_types)),
2980-
# ("params", type(params)),
2981-
# ("value_type", type(value_type)),
2982-
# ("key_hasher", type(key_hashers)),
2983-
# ("ignore_decoding_errors", type(ignore_decoding_errors)),
2984-
# )
2985-
result = await asyncio.get_running_loop().run_in_executor(
2986-
executor,
2987-
decode_query_map,
2988-
result_group["changes"],
2989-
prefix,
2990-
runtime.registry.registry,
2991-
param_types,
2992-
params,
2993-
value_type,
2994-
key_hashers,
2995-
ignore_decoding_errors,
2996-
)
2997-
# max_workers = executor._max_workers
2998-
# result_group_changes_groups = [result_group["changes"][i:i + max_workers] for i in range(0, len(result_group["changes"]), max_workers)]
2999-
# all_results = executor.map(
3000-
# self._decode_query_map,
3001-
# result_group["changes"],
3002-
# repeat(prefix),
3003-
# repeat(runtime.registry),
3004-
# repeat(param_types),
3005-
# repeat(params),
3006-
# repeat(value_type),
3007-
# repeat(key_hashers),
3008-
# repeat(ignore_decoding_errors)
3009-
# )
3010-
# for r in all_results:
3011-
# result.extend(r)
3012-
else:
3013-
result = decode_query_map(
3014-
result_group["changes"],
3015-
prefix,
3016-
runtime.registry.registry,
3017-
param_types,
3018-
params,
3019-
value_type,
3020-
key_hashers,
3021-
ignore_decoding_errors,
3022-
)
2969+
result = decode_query_map(
2970+
result_group["changes"],
2971+
prefix,
2972+
runtime,
2973+
param_types,
2974+
params,
2975+
value_type,
2976+
key_hashers,
2977+
ignore_decoding_errors,
2978+
)
30232979
return AsyncQueryMapResult(
30242980
records=result,
30252981
page_size=page_size,
@@ -3031,7 +2987,6 @@ async def query_map(
30312987
last_key=last_key,
30322988
max_results=max_results,
30332989
ignore_decoding_errors=ignore_decoding_errors,
3034-
executor=executor,
30352990
)
30362991

30372992
async def submit_extrinsic(

async_substrate_interface/utils/decoding.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
from typing import Union
1+
from typing import Union, TYPE_CHECKING
22

3-
from bt_decode import AxonInfo, PrometheusInfo
3+
from bt_decode import AxonInfo, PrometheusInfo, decode_list
4+
from scalecodec import ss58_encode
5+
from bittensor_wallet.utils import SS58_FORMAT
6+
7+
from async_substrate_interface.utils import hex_to_bytes
8+
from async_substrate_interface.types import ScaleObj
9+
10+
if TYPE_CHECKING:
11+
from async_substrate_interface.types import Runtime
412

513

614
def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool:
@@ -44,3 +52,91 @@ def _bt_decode_to_dict_or_list(obj) -> Union[dict, list[dict]]:
4452
else:
4553
as_dict[key] = val
4654
return as_dict
55+
56+
57+
def _decode_scale_list_with_runtime(
58+
type_string: list[str],
59+
scale_bytes: list[bytes],
60+
runtime_registry,
61+
return_scale_obj: bool = False,
62+
):
63+
if scale_bytes == b"":
64+
return None
65+
if type_string == "scale_info::0": # Is an AccountId
66+
# Decode AccountId bytes to SS58 address
67+
return ss58_encode(scale_bytes, SS58_FORMAT)
68+
else:
69+
obj = decode_list(type_string, runtime_registry, scale_bytes)
70+
if return_scale_obj:
71+
return [ScaleObj(x) for x in obj]
72+
else:
73+
return obj
74+
75+
76+
def decode_query_map(
77+
result_group_changes,
78+
prefix,
79+
runtime: "Runtime",
80+
param_types,
81+
params,
82+
value_type,
83+
key_hashers,
84+
ignore_decoding_errors,
85+
):
86+
def concat_hash_len(key_hasher: str) -> int:
87+
"""
88+
Helper function to avoid if statements
89+
"""
90+
if key_hasher == "Blake2_128Concat":
91+
return 16
92+
elif key_hasher == "Twox64Concat":
93+
return 8
94+
elif key_hasher == "Identity":
95+
return 0
96+
else:
97+
raise ValueError("Unsupported hash type")
98+
99+
hex_to_bytes_ = hex_to_bytes
100+
101+
result = []
102+
# Determine type string
103+
key_type_string_ = []
104+
for n in range(len(params), len(param_types)):
105+
key_type_string_.append(f"[u8; {concat_hash_len(key_hashers[n])}]")
106+
key_type_string_.append(param_types[n])
107+
key_type_string = f"({', '.join(key_type_string_)})"
108+
109+
pre_decoded_keys = []
110+
pre_decoded_key_types = [key_type_string] * len(result_group_changes)
111+
pre_decoded_values = []
112+
pre_decoded_value_types = [value_type] * len(result_group_changes)
113+
114+
for item in result_group_changes:
115+
pre_decoded_keys.append(bytes.fromhex(item[0][len(prefix) :]))
116+
pre_decoded_values.append(hex_to_bytes_(item[1]))
117+
all_decoded = _decode_scale_list_with_runtime(
118+
pre_decoded_key_types + pre_decoded_value_types,
119+
pre_decoded_keys + pre_decoded_values,
120+
runtime.registry,
121+
)
122+
middl_index = len(all_decoded) // 2
123+
decoded_keys = all_decoded[:middl_index]
124+
decoded_values = [ScaleObj(x) for x in all_decoded[middl_index:]]
125+
for dk, dv in zip(decoded_keys, decoded_values):
126+
try:
127+
# strip key_hashers to use as item key
128+
if len(param_types) - len(params) == 1:
129+
item_key = dk[1]
130+
else:
131+
item_key = tuple(
132+
dk[key + 1] for key in range(len(params), len(param_types) + 1, 2)
133+
)
134+
135+
except Exception as _:
136+
if not ignore_decoding_errors:
137+
raise
138+
item_key = None
139+
140+
item_value = dv
141+
result.append([item_key, item_value])
142+
return result

async_substrate_interface/utils/decoding_attempt.py

Lines changed: 0 additions & 104 deletions
This file was deleted.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ dependencies = [
1010
"wheel",
1111
"asyncstdlib~=3.13.0",
1212
"bittensor-wallet>=2.1.3",
13-
"bt-decode==v0.5.0",
13+
"bt-decode==v0.6.0",
1414
"scalecodec~=1.2.11",
1515
"websockets>=14.1",
1616
"xxhash"

0 commit comments

Comments
 (0)