Skip to content

Commit

Permalink
minor fixes for NFT transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
shuva10v committed Sep 10, 2024
1 parent 17257b9 commit 74c6a79
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 26 deletions.
41 changes: 24 additions & 17 deletions parser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import time
import json
import traceback
from loguru import logger
from db import DB
from kafka import KafkaConsumer
Expand All @@ -13,6 +14,7 @@
group_id = os.environ.get("KAFKA_GROUP_ID")
topics = os.environ.get("KAFKA_TOPICS", "ton.public.latest_account_states,ton.public.messages,ton.public.nft_transfers")
log_interval = int(os.environ.get("LOG_INTERVAL", '10'))
supported_parsers = os.environ.get("SUPPORTED_PARSERS", "*")
db = DB()

consumer = KafkaConsumer(
Expand All @@ -28,21 +30,26 @@
last = time.time()
total = 0
successful = 0
PARSERS = generate_parsers(None)
PARSERS = generate_parsers(None if supported_parsers == '*' else set(supported_parsers.split(",")))
for msg in consumer:
__op = msg.get('__op', None)
if __op == 'd': # ignore deletes
continue

total += 1
handled = 0
for parser in PARSERS.get(msg.topic, []):
if parser.handle(json.loads(msg.value.decode("utf-8")), db):
handled = 1
successful += handled
now = time.time()
if now - last > log_interval:
logger.info(f"{1.0 * total / (now - last):0.2f} Kafka messages per second, {100.0 * successful / total:0.2f}% handled")
last = now
successful = 0
total = 0
# logger.info(msg)
try:
total += 1
handled = 0
obj = json.loads(msg.value.decode("utf-8"))
__op = obj.get('__op', None)
if __op == 'd': # ignore deletes
continue
for parser in PARSERS.get(msg.topic, []):
if parser.handle(obj, db):
handled = 1
successful += handled
now = time.time()
if now - last > log_interval:
logger.info(f"{1.0 * total / (now - last):0.2f} Kafka messages per second, {100.0 * successful / total:0.2f}% handled")
last = now
successful = 0
total = 0
except Exception as e:
logger.error(f"Failted to process item {msg}: {e} {traceback.format_exc()}")
raise
18 changes: 18 additions & 0 deletions parser/model/nft_history.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
from dataclasses import dataclass

"""
CREATE TABLE parsed.nft_history (
tx_hash bpchar(44) not NULL,
utime int8 NULL,
tx_lt int8 NULL,
event_type nft_history_event not NULL,
nft_item_address varchar NULL,
collection_address varchar NULL,
sale_address varchar NULL,
code_hash varchar NULL,
marketplace varchar NULL,
current_owner varchar NULL,
new_owner varchar NULL,
price numeric NULL,
is_auction bool NULL,
CONSTRAINT nft_history_tx_hash_key PRIMARY KEY (tx_hash)
);
"""

@dataclass
class NftHistory:
Expand Down
11 changes: 8 additions & 3 deletions parser/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@
from parsers.message.dedust_swap import DedustSwap
from parsers.nft_transfer.nft_history import NftHistoryParser
from model.parser import Parser
from loguru import logger

_parsers = [
DedustSwap(),
NftHistoryParser(),
NftHistoryParser()
]

"""
dict of parsers, where key is the topic name
"""
def generate_parsers(names: Set)-> Dict[str, List[Parser]]:

# TODO add suppoer for filtering by names
out: Dict[str, List[Parser]] = {}

for parser in _parsers:
if names is not None:
if type(parser).__name__ not in names:
logger.info(f"Skipping parser {parser}, it is not in supported parsers list")
continue
else:
logger.info(f"Adding parser {parser}: {type(parser).__name__}, {names}")
for topic in parser.topics():
if topic not in out:
out[topic] = []
Expand Down
3 changes: 0 additions & 3 deletions parser/parsers/message/dedust_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,3 @@ def handle_internal(self, obj, db: DB):
referral_address=referral_addr
)
db.serialize(swap)

# logger.info(f"Lalal: {cell}")
# raise
6 changes: 3 additions & 3 deletions parser/parsers/nft_transfer/nft_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ def handle_internal(self, obj: dict, db: DB):
event_type = NftHistory.EVENT_TYPE_TRANSFER

nft_history = NftHistory(
tx_hash=obj.get("tx_hash"),
utime=obj.get("tx_now"),
tx_lt=obj.get("tx_lt"),
tx_hash=Parser.require(obj.get('tx_hash', None)),
utime=Parser.require(obj.get("tx_now")),
tx_lt=Parser.require(obj.get("tx_lt")),
event_type=event_type,
nft_item_address=obj.get("nft_item_address"),
collection_address=obj.get("nft_collection_address"),
Expand Down

0 comments on commit 74c6a79

Please sign in to comment.