diff --git a/python/pools.py b/python/pools.py index 50c774b1..f7fdfee1 100644 --- a/python/pools.py +++ b/python/pools.py @@ -1,12 +1,18 @@ import csv import json - +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from enum import Enum from web3 import Web3 from typing import Dict, List, Optional +from random import randint +from time import sleep + +from constants import * -from constants import * +RATE_LIMIT = 5 # Limit to 10 requests per second class DexVariant(Enum): @@ -44,6 +50,14 @@ def cache_row(self): self.fee, ] +def fetch_events(params, factory_address, v2_factory): + sleep(randint(10, 50) / 100.0) # Adding some jitter to avoid rate-limiting + try: + events = v2_factory.events.PairCreated.get_logs(fromBlock=params[0], toBlock=params[1]) + return [(factory_address, event) for event in events] + except Exception as e: + print(f"Error fetching events: {e}") + return [] def load_cached_pools() -> Optional[Dict[str, Pool]]: if os.path.exists(CACHED_POOLS_FILE): @@ -160,11 +174,76 @@ def load_all_pools_from_v2(https_url: str, return pools +def load_all_pools_from_v2(https_url: str, + factory_addresses: List[str], + from_blocks: List[int], + chunk: int = 100000) -> Dict[str, Pool]: + + pools = load_cached_pools() or {} + v2_factory_abi = json.load(open(ABI_PATH / 'UniswapV2Factory.json', 'r')) + erc20_abi = json.load(open(ABI_PATH / 'ERC20.json', 'r')) + w3 = Web3(Web3.HTTPProvider(https_url)) + to_block = w3.eth.get_block_number() + decimals: Dict[str, int] = {} + + with ThreadPoolExecutor(max_workers=RATE_LIMIT) as executor: + rate_limit_futures = [] + + for i in range(len(factory_addresses)): + factory_address = factory_addresses[i] + from_block = from_blocks[i] + v2_factory = w3.eth.contract(address=factory_address, abi=v2_factory_abi) + + block_range = list(range(from_block, to_block, chunk)) + request_params = [(block_range[i], block_range[i + 1]) for i in range(len(block_range) - 1)] + + for params in request_params: + future = executor.submit(fetch_events, params, factory_address, v2_factory) + rate_limit_futures.append(future) + + with tqdm(total=len(rate_limit_futures), desc="Processing events", ascii=' =', leave=True) as pbar: + for future in as_completed(rate_limit_futures): + for factory_address, event in future.result(): + args = event.args + token0 = args.token0 + token1 = args.token1 + + try: + if token0 in decimals: + decimals0 = decimals[token0] + else: + token0_contract = w3.eth.contract(address=token0, abi=erc20_abi) + decimals0 = token0_contract.functions.decimals().call() + decimals[token0] = decimals0 + + if token1 in decimals: + decimals1 = decimals[token1] + else: + token1_contract = w3.eth.contract(address=token1, abi=erc20_abi) + decimals1 = token1_contract.functions.decimals().call() + decimals[token1] = decimals1 + except Exception as _: + continue + + pool = Pool(address=args.pair, + version=DexVariant.UniswapV2, + token0=args.token0, + token1=args.token1, + decimals0=decimals0, + decimals1=decimals1, + fee=300) + if args.pair not in pools: + pools[args.pair] = pool + cache_synced_pools(pool) + + pbar.update(1) + + return pools if __name__ == '__main__': factory_addresses = [ - '0xc35DADB65012eC5796536bD9864eD8773aBc74C4', # Sushiswap V2 (Polygon) - '0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32', # Uniswap V2 (Polygon) + '0xc35DADB65012eC5796536bD9864eD8773aBc74C4', + '0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32', ] factory_blocks = [ 11333218, @@ -172,4 +251,4 @@ def load_all_pools_from_v2(https_url: str, ] pools = load_all_pools_from_v2(HTTPS_URL, factory_addresses, factory_blocks, 100000) - print(pools) + print(pools) \ No newline at end of file