diff --git a/parser/main.py b/parser/main.py index 6929e09..f9ea553 100644 --- a/parser/main.py +++ b/parser/main.py @@ -3,6 +3,7 @@ import os import time import json +import traceback from loguru import logger from db import DB from kafka import KafkaConsumer @@ -13,6 +14,7 @@ group_id = os.environ.get("KAFKA_GROUP_ID") topics = os.environ.get("KAFKA_TOPICS", "ton.public.latest_account_states,ton.public.messages,ton.public.nft_transfers") log_interval = int(os.environ.get("LOG_INTERVAL", '10')) + supported_parsers = os.environ.get("SUPPORTED_PARSERS", "*") db = DB() consumer = KafkaConsumer( @@ -28,21 +30,26 @@ last = time.time() total = 0 successful = 0 - PARSERS = generate_parsers(None) + PARSERS = generate_parsers(None if supported_parsers == '*' else set(supported_parsers.split(","))) for msg in consumer: - __op = msg.get('__op', None) - if __op == 'd': # ignore deletes - continue - - total += 1 - handled = 0 - for parser in PARSERS.get(msg.topic, []): - if parser.handle(json.loads(msg.value.decode("utf-8")), db): - handled = 1 - successful += handled - now = time.time() - if now - last > log_interval: - logger.info(f"{1.0 * total / (now - last):0.2f} Kafka messages per second, {100.0 * successful / total:0.2f}% handled") - last = now - successful = 0 - total = 0 + # logger.info(msg) + try: + total += 1 + handled = 0 + obj = json.loads(msg.value.decode("utf-8")) + __op = obj.get('__op', None) + if __op == 'd': # ignore deletes + continue + for parser in PARSERS.get(msg.topic, []): + if parser.handle(obj, db): + handled = 1 + successful += handled + now = time.time() + if now - last > log_interval: + logger.info(f"{1.0 * total / (now - last):0.2f} Kafka messages per second, {100.0 * successful / total:0.2f}% handled") + last = now + successful = 0 + total = 0 + except Exception as e: + logger.error(f"Failted to process item {msg}: {e} {traceback.format_exc()}") + raise diff --git a/parser/model/nft_history.py b/parser/model/nft_history.py index 88da1b5..4cb97a8 100644 --- a/parser/model/nft_history.py +++ b/parser/model/nft_history.py @@ -1,5 +1,23 @@ from dataclasses import dataclass +""" +CREATE TABLE parsed.nft_history ( + tx_hash bpchar(44) not NULL, + utime int8 NULL, + tx_lt int8 NULL, + event_type nft_history_event not NULL, + nft_item_address varchar NULL, + collection_address varchar NULL, + sale_address varchar NULL, + code_hash varchar NULL, + marketplace varchar NULL, + current_owner varchar NULL, + new_owner varchar NULL, + price numeric NULL, + is_auction bool NULL, + CONSTRAINT nft_history_tx_hash_key PRIMARY KEY (tx_hash) +); +""" @dataclass class NftHistory: diff --git a/parser/parsers/__init__.py b/parser/parsers/__init__.py index 08cd38c..f7d5434 100644 --- a/parser/parsers/__init__.py +++ b/parser/parsers/__init__.py @@ -2,21 +2,26 @@ from parsers.message.dedust_swap import DedustSwap from parsers.nft_transfer.nft_history import NftHistoryParser from model.parser import Parser +from loguru import logger _parsers = [ DedustSwap(), - NftHistoryParser(), + NftHistoryParser() ] """ dict of parsers, where key is the topic name """ def generate_parsers(names: Set)-> Dict[str, List[Parser]]: - - # TODO add suppoer for filtering by names out: Dict[str, List[Parser]] = {} for parser in _parsers: + if names is not None: + if type(parser).__name__ not in names: + logger.info(f"Skipping parser {parser}, it is not in supported parsers list") + continue + else: + logger.info(f"Adding parser {parser}: {type(parser).__name__}, {names}") for topic in parser.topics(): if topic not in out: out[topic] = [] diff --git a/parser/parsers/message/dedust_swap.py b/parser/parsers/message/dedust_swap.py index 84f1823..85c8cc8 100644 --- a/parser/parsers/message/dedust_swap.py +++ b/parser/parsers/message/dedust_swap.py @@ -61,6 +61,3 @@ def handle_internal(self, obj, db: DB): referral_address=referral_addr ) db.serialize(swap) - - # logger.info(f"Lalal: {cell}") - # raise diff --git a/parser/parsers/nft_transfer/nft_history.py b/parser/parsers/nft_transfer/nft_history.py index f58423d..1126fb7 100644 --- a/parser/parsers/nft_transfer/nft_history.py +++ b/parser/parsers/nft_transfer/nft_history.py @@ -74,9 +74,9 @@ def handle_internal(self, obj: dict, db: DB): event_type = NftHistory.EVENT_TYPE_TRANSFER nft_history = NftHistory( - tx_hash=obj.get("tx_hash"), - utime=obj.get("tx_now"), - tx_lt=obj.get("tx_lt"), + tx_hash=Parser.require(obj.get('tx_hash', None)), + utime=Parser.require(obj.get("tx_now")), + tx_lt=Parser.require(obj.get("tx_lt")), event_type=event_type, nft_item_address=obj.get("nft_item_address"), collection_address=obj.get("nft_collection_address"),