Skip to content

Commit

Permalink
feat: optimize db entity handling
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Apr 8, 2024
1 parent 7d5bdb8 commit f2450a8
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 141 deletions.
109 changes: 60 additions & 49 deletions scripts/exporters/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
from decimal import Decimal
from typing import Optional

import dank_mids
import pandas as pd
import sentry_sdk
from async_lru import alru_cache
from brownie import ZERO_ADDRESS, chain, web3
from brownie.exceptions import BrownieEnvironmentWarning
from brownie.network.event import _EventItem
from multicall.utils import await_awaitable
from pony.orm import db_session
from web3._utils.abi import filter_by_name
from web3._utils.events import construct_event_topic_set
from y.networks import Network
from y import ERC20, Network, get_block_timestamp_async
from y.utils.events import get_logs_asap

from yearn.entities import UserTx
from yearn.events import decode_logs
from yearn.exceptions import BatchSizeError
from yearn.outputs.postgres.utils import (cache_address, cache_chain,
cache_token, last_recorded_block)
from yearn.outputs.postgres.utils import (address_dbid, chain_dbid, cache_token,
last_recorded_block, token_dbid)
from yearn.prices.magic import _get_price
from yearn.typing import Block
from yearn.utils import threads
from yearn.yearn import Yearn

sentry_sdk.set_tag('script','transactions_exporter')
Expand All @@ -39,7 +43,7 @@
Network.Gnosis: 2_000_000,
Network.Arbitrum: 1_500_000,
Network.Optimism: 4_000_000,
Network.Base: 500_000,
Network.Base: 2_000_000,
}[chain.id]

FIRST_END_BLOCK = {
Expand All @@ -56,13 +60,11 @@ def main():
while True:
cached_thru = last_recorded_block(UserTx)
_check_for_infinite_loop(_cached_thru_from_last_run, cached_thru)
process_and_cache_user_txs(cached_thru)
await_awaitable(process_and_cache_user_txs(cached_thru))
_cached_thru_from_last_run = cached_thru
time.sleep(1)


@db_session
def process_and_cache_user_txs(last_saved_block=None):
async def process_and_cache_user_txs(last_saved_block=None):
# NOTE: We look 50 blocks back to avoid uncles and reorgs
max_block_to_cache = chain.height - 50
start_block = last_saved_block + 1 if last_saved_block else None
Expand All @@ -73,82 +75,91 @@ def process_and_cache_user_txs(last_saved_block=None):
)
if start_block and start_block > end_block:
end_block = start_block
vaults = await_awaitable(yearn.active_vaults_at(end_block))
df = pd.concat(await_awaitable(asyncio.gather(*[get_token_transfers(vault.vault, start_block, end_block) for vault in vaults]))) if vaults else pd.DataFrame()
vaults = await yearn.active_vaults_at(end_block)
df = pd.concat(await asyncio.gather(*[get_token_transfers(vault.vault, start_block, end_block) for vault in vaults])) if vaults else pd.DataFrame()
if len(df):
# NOTE: We want to insert txs in the order they took place, so wallet exporter
# won't have issues in the event that transactions exporter fails mid-run.
df = df.sort_values('block')
for index, row in df.iterrows():
# this addresses one tx with a crazy price due to yvpbtc v1 pricePerFullShare bug.
price = row.price if len(str(round(row.price))) <= 20 else 99999999999999999999
usd = row.value_usd if len(str(round(row.value_usd))) <= 20 else 99999999999999999999

UserTx(
chain=cache_chain(),
vault=cache_token(row.token),
timestamp=row.timestamp,
block=row.block,
hash=row.hash,
log_index=row.log_index,
type=row.type,
from_address=cache_address(row['from']),
to_address=cache_address(row['to']),
amount = row.amount,
price = Decimal(price),
value_usd = Decimal(usd),
gas_used = row.gas_used,
gas_price = row.gas_price
)
await asyncio.gather(*(insert_user_tx(row) for index, row in df.iterrows()))
if start_block == end_block:
logger.info(f'{len(df)} user txs exported to postrges [block {start_block}]')
else:
logger.info(f'{len(df)} user txs exported to postrges [blocks {start_block}-{end_block}]')

async def insert_user_tx(row) -> None:
chain_pk = chain_dbid()
vault_dbid, from_address_dbid, to_address_dbid = await asyncio.gather(threads.run(token_dbid, row.token), threads.run(address_dbid, row['from']), threads.run(address_dbid, row['to']))
# this addresses one tx with a crazy price due to yvpbtc v1 pricePerFullShare bug.
price = row.price if len(str(round(row.price))) <= 20 else 99999999999999999999
usd = row.value_usd if len(str(round(row.value_usd))) <= 20 else 99999999999999999999

await threads.run(
db_session(UserTx),
chain=chain_pk,
vault=vault_dbid,
timestamp=row.timestamp,
block=row.block,
hash=row.hash,
log_index=row.log_index,
type=row.type,
from_address=from_address_dbid,
to_address=to_address_dbid,
amount = row.amount,
price = Decimal(price),
value_usd = Decimal(usd),
gas_used = row.gas_used,
gas_price = row.gas_price,
)

# Helper functions
async def get_token_transfers(token, start_block, end_block) -> pd.DataFrame:
topics = construct_event_topic_set(
filter_by_name('Transfer', token.abi)[0],
web3.codec,
)
events = decode_logs(
await get_logs_asap(token.address, topics, from_block=start_block, to_block=end_block, sync=False)
)
token_entity = cache_token(token.address)
transfers = await asyncio.gather(*[_process_transfer_event(event, token_entity) for event in events])
logs = await get_logs_asap(token.address, topics, from_block=start_block, to_block=end_block, sync=False)
transfers = await asyncio.gather(*[_process_transfer_event(event) for event in decode_logs(logs)])
return pd.DataFrame(transfers)


async def _process_transfer_event(event, token_entity) -> dict:
async def _process_transfer_event(event: _EventItem) -> dict:
sender, receiver, amount = event.values()
cache_address(sender)
cache_address(receiver)
price = await get_price(token_entity.address.address, event.block_number)
txhash = event.transaction_hash.hex()
tx, tx_receipt, timestamp, token_dbid, price, scale = await asyncio.gather(
dank_mids.eth.get_transaction(txhash),
dank_mids.eth.get_transaction_receipt(txhash),
get_block_timestamp_async(event.block_number),
get_token_dbid(event.address), # NOTE: we don't use this output but we call this to ensure the token fk is in the db before we insert the transfer
get_price(event.address, event.block_number),
ERC20(event.address, asynchronous=True).scale,
)
if (
# NOTE magic.get_price() returns erroneous price due to erroneous ppfs
token_entity.address.address == '0x7F83935EcFe4729c4Ea592Ab2bC1A32588409797'
event.address == '0x7F83935EcFe4729c4Ea592Ab2bC1A32588409797'
and event.block_number == 12869164
):
price = 99999
txhash = event.transaction_hash.hex()
return {
'chainid': chain.id,
'block': event.block_number,
'timestamp': chain[event.block_number].timestamp,
'timestamp': int(timestamp),
'hash': txhash,
'log_index': event.log_index,
'token': token_entity.address.address,
'type': _event_type(sender, receiver, token_entity.address.address),
'token': event.address,
'type': _event_type(sender, receiver, event.address),
'from': sender,
'to': receiver,
'amount': Decimal(amount) / Decimal(10 ** token_entity.decimals),
'amount': Decimal(amount) / Decimal(scale),
'price': price,
'value_usd': Decimal(amount) / Decimal(10 ** token_entity.decimals) * Decimal(price),
'gas_used': web3.eth.getTransactionReceipt(txhash).gasUsed,
'gas_price': web3.eth.getTransaction(txhash).gasPrice
'value_usd': Decimal(amount) / Decimal(scale) * Decimal(price),
'gas_used': tx_receipt.gasUsed,
'gas_price': tx.gasPrice
}

@alru_cache(maxsize=None)
async def get_token_dbid(address: str) -> int:
return await threads.run(token_dbid, address)

async def get_price(token_address, block):
try:
Expand Down
42 changes: 25 additions & 17 deletions scripts/exporters/treasury_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
from y.time import get_block_timestamp_async

from yearn.entities import TreasuryTx, deduplicate_internal_transfers
from yearn.outputs.postgres.utils import (cache_address, cache_chain,
cache_token)
from yearn.outputs.postgres.utils import address_dbid, chain_dbid, token_dbid
from yearn.treasury import accountant
from yearn.treasury.treasury import YearnTreasury

Expand All @@ -29,6 +28,8 @@

logger = logging.getLogger('yearn.treasury_transactions_exporter')

GNOSIS_SINGLETON = "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552"

treasury = YearnTreasury(load_prices=True, asynchronous=True)


Expand All @@ -54,11 +55,16 @@ def main() -> NoReturn:
@a_sync(default='sync')
async def load_new_txs(start_block: Block, end_block: Block) -> int:
"""returns: number of new txs"""
futs = [
asyncio.create_task(insert_treasury_tx(entry))
async for entry in treasury.ledger._get_and_yield(start_block, end_block)
if not isinstance(entry, _Done) and entry.value
]
futs = []
async for entry in treasury.ledger[start_block: end_block]:
if isinstance(entry, InternalTransfer) and entry.to_address == GNOSIS_SINGLETON:
# TODO: move this into eth-port
logger.debug("internal transfer to gnosis singleton, these are goofy and not real. ignoring %s", entry)
continue
if not entry.value:
logger.debug("zero value transfer, skipping %s", entry)
continue
futs.append(asyncio.create_task(insert_treasury_tx(entry)))
if not futs:
return 0
to_sort = sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres"))
Expand All @@ -81,26 +87,25 @@ async def insert_treasury_tx(entry: LedgerEntry) -> int:
await sort_thread.run(accountant.sort_tx, txid)
return 0


@db_session
def insert_to_db(entry: LedgerEntry, ts: int) -> bool:
def insert_to_db(entry: LedgerEntry, ts: int) -> int:
if isinstance(entry, TokenTransfer):
log_index = entry.log_index
token = cache_token(entry.token_address)
token = token_dbid(entry.token_address)
gas = None
else:
log_index = None
token = cache_token(EEE_ADDRESS)
token = token_dbid(EEE_ADDRESS)
gas = entry.gas
try:
entity = TreasuryTx(
chain=cache_chain(),
chain=chain_dbid(),
block = entry.block_number,
timestamp = ts,
hash = entry.hash,
log_index = log_index,
from_address = cache_address(entry.from_address) if entry.from_address else None,
to_address = cache_address(entry.to_address) if entry.to_address else None,
from_address = address_dbid(entry.from_address) if entry.from_address else None,
to_address = address_dbid(entry.to_address) if entry.to_address else None,
token = token,
amount = entry.value,
price = entry.price,
Expand All @@ -120,11 +125,14 @@ def insert_to_db(entry: LedgerEntry, ts: int) -> bool:
@db_session
def _validate_integrity_error(entry: LedgerEntry, log_index: int) -> None:
''' Raises AssertionError if existing object that causes a TransactionIntegrityError is not an EXACT MATCH to the attempted insert. '''
existing_object = TreasuryTx.get(hash=entry.hash, log_index=log_index, chain=cache_chain())
existing_object = TreasuryTx.get(hash=entry.hash, log_index=log_index, chain=chain_dbid())
if existing_object is None:
existing_objects = list(TreasuryTx.select(lambda tx: tx.hash==entry.hash and tx.log_index==log_index and tx.chain==cache_chain()))
existing_objects = list(TreasuryTx.select(lambda tx: tx.hash==entry.hash and tx.log_index==log_index and tx.chain==chain_dbid()))
raise ValueError(f'unable to `.get` due to multiple entries: {existing_objects}')
assert entry.to_address == existing_object.to_address.address, (entry.to_address,existing_object.to_address.address)
if entry.to_address:
assert entry.to_address == existing_object.to_address.address, (entry.to_address, existing_object.to_address.address)
else:
assert existing_object.to_address is None, (entry.to_address, existing_object.to_address)
assert entry.from_address == existing_object.from_address.address, (entry.from_address, existing_object.from_address.address)
assert entry.value == existing_object.amount or entry.value == -1 * existing_object.amount, (entry.value, existing_object.amount)
assert entry.block_number == existing_object.block, (entry.block_number, existing_object.block)
Expand Down
24 changes: 12 additions & 12 deletions yearn/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ def get_or_create_entity(cls, log: _EventItem) -> "Stream":
try:
return Stream[stream_id]
except ObjectNotFound:
from yearn.outputs.postgres.utils import (cache_address,
cache_token,
from yearn.outputs.postgres.utils import (address_dbid,
token_dbid,
cache_txgroup)

txgroup = {
Expand All @@ -283,10 +283,10 @@ def get_or_create_entity(cls, log: _EventItem) -> "Stream":
}.get(to_address, "Other Grants")

txgroup = cache_txgroup(txgroup)
stream_contract = cache_address(log.address)
token = cache_token(Contract(log.address).token())
from_address = cache_address(from_address)
to_address = cache_address(to_address)
stream_contract = address_dbid(log.address)
token = token_dbid(Contract(log.address).token())
from_address = address_dbid(from_address)
to_address = address_dbid(to_address)

entity = Stream(
stream_id = stream_id,
Expand Down Expand Up @@ -430,18 +430,18 @@ def rugged_on(self) -> typing.Optional[date]:

@staticmethod
def get_or_create_entity(event: _EventItem) -> "VestingEscrow":
from yearn.outputs.postgres.utils import cache_address, cache_token
from yearn.outputs.postgres.utils import address_dbid, cache_token

print(event)
funder, token, recipient, escrow, amount, start, duration, cliff_length = event.values()
escrow_address_entity = cache_address(escrow)
escrow = VestingEscrow.get(address=escrow_address_entity)
escrow_address_dbid = address_dbid(escrow)
escrow = VestingEscrow.get(address=escrow_address_dbid)
if escrow is None:
token_entity = cache_token(token)
escrow = VestingEscrow(
address = escrow_address_entity,
funder = cache_address(funder),
recipient = cache_address(recipient),
address = escrow_address_dbid,
funder = address_dbid(funder),
recipient = address_dbid(recipient),
token = token_entity,
amount = amount / token_entity.scale,
start_timestamp = datetime.fromtimestamp(start),
Expand Down
Loading

0 comments on commit f2450a8

Please sign in to comment.