Skip to content

Commit

Permalink
Add async
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesEBall committed Aug 31, 2023
1 parent ce2fc97 commit a35a8d3
Showing 1 changed file with 84 additions and 5 deletions.
89 changes: 84 additions & 5 deletions python/pools.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import csv
import json

import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from enum import Enum
from web3 import Web3
from typing import Dict, List, Optional
from random import randint
from time import sleep

from constants import *

from constants import *
RATE_LIMIT = 5 # Limit to 10 requests per second


class DexVariant(Enum):
Expand Down Expand Up @@ -44,6 +50,14 @@ def cache_row(self):
self.fee,
]

def fetch_events(params, factory_address, v2_factory):
sleep(randint(10, 50) / 100.0) # Adding some jitter to avoid rate-limiting
try:
events = v2_factory.events.PairCreated.get_logs(fromBlock=params[0], toBlock=params[1])
return [(factory_address, event) for event in events]
except Exception as e:
print(f"Error fetching events: {e}")
return []

def load_cached_pools() -> Optional[Dict[str, Pool]]:
if os.path.exists(CACHED_POOLS_FILE):
Expand Down Expand Up @@ -160,16 +174,81 @@ def load_all_pools_from_v2(https_url: str,

return pools

def load_all_pools_from_v2(https_url: str,
factory_addresses: List[str],
from_blocks: List[int],
chunk: int = 100000) -> Dict[str, Pool]:

pools = load_cached_pools() or {}
v2_factory_abi = json.load(open(ABI_PATH / 'UniswapV2Factory.json', 'r'))
erc20_abi = json.load(open(ABI_PATH / 'ERC20.json', 'r'))
w3 = Web3(Web3.HTTPProvider(https_url))
to_block = w3.eth.get_block_number()
decimals: Dict[str, int] = {}

with ThreadPoolExecutor(max_workers=RATE_LIMIT) as executor:
rate_limit_futures = []

for i in range(len(factory_addresses)):
factory_address = factory_addresses[i]
from_block = from_blocks[i]
v2_factory = w3.eth.contract(address=factory_address, abi=v2_factory_abi)

block_range = list(range(from_block, to_block, chunk))
request_params = [(block_range[i], block_range[i + 1]) for i in range(len(block_range) - 1)]

for params in request_params:
future = executor.submit(fetch_events, params, factory_address, v2_factory)
rate_limit_futures.append(future)

with tqdm(total=len(rate_limit_futures), desc="Processing events", ascii=' =', leave=True) as pbar:
for future in as_completed(rate_limit_futures):
for factory_address, event in future.result():
args = event.args
token0 = args.token0
token1 = args.token1

try:
if token0 in decimals:
decimals0 = decimals[token0]
else:
token0_contract = w3.eth.contract(address=token0, abi=erc20_abi)
decimals0 = token0_contract.functions.decimals().call()
decimals[token0] = decimals0

if token1 in decimals:
decimals1 = decimals[token1]
else:
token1_contract = w3.eth.contract(address=token1, abi=erc20_abi)
decimals1 = token1_contract.functions.decimals().call()
decimals[token1] = decimals1
except Exception as _:
continue

pool = Pool(address=args.pair,
version=DexVariant.UniswapV2,
token0=args.token0,
token1=args.token1,
decimals0=decimals0,
decimals1=decimals1,
fee=300)
if args.pair not in pools:
pools[args.pair] = pool
cache_synced_pools(pool)

pbar.update(1)

return pools

if __name__ == '__main__':
factory_addresses = [
'0xc35DADB65012eC5796536bD9864eD8773aBc74C4', # Sushiswap V2 (Polygon)
'0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32', # Uniswap V2 (Polygon)
'0xc35DADB65012eC5796536bD9864eD8773aBc74C4',
'0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32',
]
factory_blocks = [
11333218,
11799757,
]

pools = load_all_pools_from_v2(HTTPS_URL, factory_addresses, factory_blocks, 100000)
print(pools)
print(pools)

0 comments on commit a35a8d3

Please sign in to comment.