Skip to content

Commit

Permalink
Core prices parser, ston.fi and volume for dex swaps
Browse files Browse the repository at this point in the history
  • Loading branch information
shuva10v committed Sep 11, 2024
1 parent 6d440ea commit ba55d94
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 4 deletions.
34 changes: 33 additions & 1 deletion parser/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,36 @@ def serialize(self, obj):
insert into parsed.{table}({",".join(names)}) values ({",".join(placeholders)})
on conflict do nothing
""", tuple(values))
self.updated += 1
self.updated += 1

def insert_core_price(self, asset, price, obj):
assert self.conn is not None
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(f"""
insert into prices.core(tx_hash, lt, asset, price, price_ts, created, updated)
values(%s, %s, %s, %s, %s, now(), now())
on conflict (tx_hash) do update
set tx_hash = EXCLUDED.tx_hash,
lt = EXCLUDED.lt,
asset = EXCLUDED.asset,
price = EXCLUDED.price,
price_ts = EXCLUDED.price_ts,
updated = now()
""", (obj.get('last_trans_hash'), obj.get('last_trans_lt'), asset,
price, obj.get('timestamp')))
self.updated += 1

def get_core_price(self, asset: str, timestamp: int) -> float:
assert self.conn is not None
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
select price from prices.core where asset = %s
and price_ts < %s order by price_ts desc limit 1
""",
(asset, timestamp),
)
res = cursor.fetchone()
if not res:
return None
return float(res['price'])
39 changes: 38 additions & 1 deletion parser/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,52 @@
version: '3.9'

services:
parser:
parser_nft:
build: ./
restart: always
environment:
- KAFKA_GROUP_ID=nft_history
- SUPPORTED_PARSERS=NftHistoryParser
- KAFKA_TOPICS=ton.public.nft_transfers
- COMMIT_BATCH_SIZE=20
env_file:
- parser.env
logging:
driver: local
options:
max-size: 10M

parser_core_prices:
build: ./
restart: always
environment:
- KAFKA_GROUP_ID=core_prices
- SUPPORTED_PARSERS=CorePricesUSDT
- KAFKA_TOPICS=ton.public.latest_account_states
- KAFKA_OFFSET_RESET=latest
- COMMIT_BATCH_SIZE=1
env_file:
- parser.env
logging:
driver: local
options:
max-size: 10M

parser_messages:
build: ./
restart: always
environment:
- KAFKA_GROUP_ID=messages_parsers
- SUPPORTED_PARSERS=StonfiSwap,DedustSwap
- KAFKA_TOPICS=ton.public.messages
env_file:
- parser.env
logging:
driver: local
options:
max-size: 10M


networks:
default:
name: data_default
Expand Down
2 changes: 1 addition & 1 deletion parser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
consumer = KafkaConsumer(
group_id=group_id,
bootstrap_servers=os.environ.get("KAFKA_BROKER"),
auto_offset_reset='earliest',
auto_offset_reset=os.environ.get("KAFKA_OFFSET_RESET", 'earliest'),
enable_auto_commit=False
)
for topic in topics.split(","):
Expand Down
1 change: 1 addition & 0 deletions parser/model/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


TOPIC_MESSAGES = "ton.public.messages"
TOPIC_ACCOUNT_STATES = "ton.public.latest_account_states"
TOPIC_NFT_TRANSFERS = "ton.public.nft_transfers"

"""
Expand Down
4 changes: 3 additions & 1 deletion parser/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, List, Set
from parsers.accounts.core_prices import CorePricesUSDT
from parsers.message.dedust_swap import DedustSwap
from parsers.message.stonfi_swap import StonfiSwap
from parsers.nft_transfer.nft_history import NftHistoryParser
Expand All @@ -8,7 +9,8 @@
_parsers = [
DedustSwap(),
NftHistoryParser(),
StonfiSwap()
StonfiSwap(),
CorePricesUSDT()
]

"""
Expand Down
69 changes: 69 additions & 0 deletions parser/parsers/accounts/core_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from model.parser import Parser, TOPIC_ACCOUNT_STATES
from loguru import logger
from db import DB
from pytoniq_core import Cell, Address
from model.dexswap import DexSwapParsed

"""
Discovers prices for the core assets, i.e.:
* TON itself, based on TON/USDT pool reserves
* stTON and tsTON
"""
class CorePrices(Parser):
"""
account - address to look updates for
asset - item for core prices table
"""
def __init__(self, account, asset, update_interval=60):
self.account = account
self.asset = asset
self.latest_update = None
self.latest_price = None
self.update_interval = update_interval

def topics(self):
return [TOPIC_ACCOUNT_STATES]

def predicate(self, obj) -> bool:
return obj.get("account", None) == self.account

"""
Handle price updates and store in the DB
"""
def update_price(self, price, obj, db: DB):
timestamp = obj.get('timestamp')
if self.latest_update is not None:
if timestamp < self.latest_update + self.update_interval:
logger.debug(f"Price timestamp for {self.asset} is before latest update + update interval, skipping")
return

if price == self.latest_price:
logger.info(f"Price for asset {self.asset} is the same: {price}")
return

self.latest_update = timestamp
self.latest_price = price
db.insert_core_price(self.asset, price, obj)


class CorePricesUSDT(CorePrices):
def __init__(self, update_interval=60):
super().__init__(account=Parser.uf2raw('EQD8TJ8xEWB1SpnRE4d89YO3jl0W0EiBnNS4IBaHaUmdfizE'),
asset=Parser.uf2raw('EQCxE6mUtQJKFnGfaROTKOt1lZbDiiX1kCixRv7Nw2Id_sDs'),
update_interval=update_interval)

def handle_internal(self, obj, db: DB):
cell = Cell.one_from_boc(Parser.require(obj.get('data_boc'))).begin_parse()

cell = cell.load_ref().begin_parse()
cell.load_coins() # proto fee
cell.load_coins() # proto fee
cell.load_address() # proto fee address
reserve0 = cell.load_coins()
reserve1 = cell.load_coins()

logger.info(f"TON/USDT pool: {reserve0}, {reserve1}")
self.update_price(1.0 * reserve0 / reserve1, obj, db)

# TODO stTON and tsTON prices

2 changes: 2 additions & 0 deletions parser/parsers/message/dedust_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from db import DB
from pytoniq_core import Cell, Address
from model.dexswap import DexSwapParsed
from parsers.message.swap_volume import estimate_volume


# twin non-stable pools to avoid wrong prices estimation
Expand Down Expand Up @@ -64,4 +65,5 @@ def handle_internal(self, obj, db: DB):
reserve0=reserve0,
reserve1=reserve1
)
estimate_volume(swap, db)
db.serialize(swap)
103 changes: 103 additions & 0 deletions parser/parsers/message/stonfi_swap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from model.parser import Parser, TOPIC_MESSAGES
from loguru import logger
from db import DB
from pytoniq_core import Cell, Address
from model.dexswap import DexSwapParsed
from parsers.message.swap_volume import estimate_volume

STONFI_ROUTER = Parser.uf2raw('EQB3ncyBUTjZUA5EnFKR5_EnOMI9V1tTEAAPaiU71gc4TiUt')

class StonfiSwap(Parser):

def topics(self):
return [TOPIC_MESSAGES]

def predicate(self, obj) -> bool:
# only internal messages processed by the router
return obj.get("opcode", None) == Parser.opcode_signed(0xf93bb43f) and \
obj.get("direction", None) == "in" and \
obj.get("destination", None) == STONFI_ROUTER


def handle_internal(self, obj, db: DB):
cell = Parser.message_body(obj, db).begin_parse()
cell.load_uint(32) # 0xf93bb43f
query_id = cell.load_uint(64)
owner = cell.load_address()
exit_code = cell.load_uint(32)
params = cell.load_ref().begin_parse()
token0_amount = params.load_coins()
wallet0_address = params.load_address()
token1_amount = params.load_coins()
wallet1_address = params.load_address()
logger.info(f"{owner} {exit_code} {token0_amount} {wallet0_address} {token1_amount} {wallet1_address}")

if exit_code != 3326308581:
logger.debug(f"Message is not a payment to user, exit code {exit_code}")
return


# if token0_amount == 0 or token1_amount == 0:
# logger.info(f"Skipping zero amount swap for {obj}")
# return

# TODO Handle missing tx? is it ever possible?
tx = Parser.require(db.is_tx_successful(Parser.require(obj.get('tx_hash', None))))
if not tx:
logger.info(f"Skipping failed tx for {obj.get('tx_hash', None)}")

parent_body = db.get_parent_message_body(obj.get('msg_hash'))
if not parent_body:
logger.warning(f"Unable to find parent message for {obj.get('msg_hash')}")
return
cell = Cell.one_from_boc(parent_body).begin_parse()

op_id = cell.load_uint(32) # 0x25938561
assert op_id == 0x25938561, "Parent message for ston.fi swap is {op_id}"
parent_query_id = cell.load_uint(64)

to_address = cell.load_address()
token_wallet = cell.load_address()
token_amount = cell.load_coins()
min_out = cell.load_coins()
has_ref_address = cell.load_uint(1)
addresses = cell.load_ref().begin_parse()
from_user = addresses.load_address()
referral_address = None
if has_ref_address:
referral_address = addresses.load_address()

if token_wallet == wallet0_address:
src_wallet_address = wallet0_address
src_amount = token_amount - token0_amount
dst_wallet_address = wallet1_address
dst_amount = token1_amount
elif token_wallet == wallet1_address:
src_wallet_address = wallet1_address
src_amount = token_amount - token1_amount
dst_wallet_address = wallet0_address
dst_amount = token0_amount
else:
logger.warning(f"Wallet addresses in swap message id={obj.get('msg_hash')} and payment message don't match")

src_master = Parser.require(db.get_wallet_master(src_wallet_address))
dst_master = Parser.require(db.get_wallet_master(dst_wallet_address))

swap = DexSwapParsed(
tx_hash=Parser.require(obj.get('tx_hash', None)),
msg_hash=Parser.require(obj.get('msg_hash', None)),
trace_id=Parser.require(obj.get('trace_id', None)),
platform="ston.fi",
swap_utime=Parser.require(obj.get('created_at', None)),
swap_user=from_user,
swap_pool=Parser.require(obj.get('source', None)),
swap_src_token=src_master,
swap_dst_token=dst_master,
swap_src_amount=src_amount,
swap_dst_amount=dst_amount,
referral_address=referral_address,
query_id=query_id,
min_out=min_out
)
estimate_volume(swap, db)
db.serialize(swap)
65 changes: 65 additions & 0 deletions parser/parsers/message/swap_volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Utility method to estimate swap volume in TON and USDT
"""
from model.dexswap import DexSwapParsed
from db import DB
from loguru import logger
from model.parser import Parser

USDT = Parser.uf2raw('EQCxE6mUtQJKFnGfaROTKOt1lZbDiiX1kCixRv7Nw2Id_sDs')
jUSDT = Parser.uf2raw('EQBynBO23ywHy_CgarY9NK9FTz0yDsG82PtcbSTQgGoXwiuA')
pTON = Parser.uf2raw('EQCM3B12QK1e4yZSf8GtBRT0aLMNyEsBc_DhVfRRtOEffLez')
TON = Parser.uf2raw('EQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAM9c')
stTON = Parser.uf2raw('EQDNhy-nxYFgUqzfUzImBEP67JqsyMIcyk2S5_RwNNEYku0k')
tsTON = Parser.uf2raw('EQC98_qAmNEptUtPc7W6xdHh_ZHrBUFpw5Ft_IzNU20QAJav')

STABLES = [USDT, jUSDT]
TONS = [pTON, TON]
LSDS = [stTON, tsTON]

"""
Estimates swap volume using current core prices
Updates swap inplace
"""
def estimate_volume(swap: DexSwapParsed, db: DB):
volume_usd, volume_ton = None, None
ton_price = db.get_core_price(USDT, swap.swap_utime)
if ton_price is None:
logger.warning(f"No TON price found for {swap.swap_utime}")
return
ton_price = ton_price * 1e3 # normalize on decimals difference
if swap.swap_src_token in STABLES:
volume_usd = swap.swap_src_amount / 1e6
volume_ton = volume_usd / ton_price
if swap.swap_dst_token in STABLES:
volume_usd = swap.swap_dst_amount / 1e6
volume_ton = volume_usd / ton_price

if swap.swap_src_token in TONS:
volume_ton = swap.swap_src_amount / 1e9
volume_usd = volume_ton * ton_price
if swap.swap_dst_token in TONS:
volume_ton = swap.swap_dst_amount / 1e9
volume_usd = volume_ton * ton_price

if swap.swap_src_token in LSDS:
lsd_price = db.get_core_price(swap.swap_src_token, swap.swap_utime)
if not lsd_price:
logger.warning(f"No price for {swap.swap_src_token} for {swap.swap_utime}")
return
volume_ton = swap.swap_src_amount / 1e9 * lsd_price
volume_usd = volume_ton * ton_price
if swap.swap_dst_token in TONS:
lsd_price = db.get_core_price(swap.swap_dst_token, swap.swap_utime)
if not lsd_price:
logger.warning(f"No price for {swap.swap_dst_token} for {swap.swap_utime}")
return
volume_ton = swap.swap_dst_amount / 1e9 * lsd_price
volume_usd = volume_ton * ton_price

# TODO add support for other compinations

if volume_usd is not None:
swap.volume_usd = volume_usd
swap.volume_ton = volume_ton

0 comments on commit ba55d94

Please sign in to comment.