11import asyncio
22import json
33import logging
4- from dataclasses import asdict
54from enum import IntEnum
65from typing import Dict , Optional , Tuple , List
76
8- from aleph_message .models import MessageConfirmation
97from bson import ObjectId
108from pydantic import ValidationError
119from pymongo import UpdateOne
2624from aleph .model .pending import PendingMessage , PendingTX
2725from aleph .network import verify_signature
2826from aleph .permissions import check_sender_authorization
29- from aleph .storage import get_json , pin_hash , add_json , get_message_content
30- from .tx_context import TxContext
31- from aleph .schemas .pending_messages import (
32- BasePendingMessage ,
33- )
27+ from aleph .schemas .pending_messages import BasePendingMessage
3428from aleph .schemas .validated_message import (
3529 validate_pending_message ,
3630 ValidatedStoreMessage ,
3731 ValidatedForgetMessage ,
3832 make_confirmation_update_query ,
39- make_message_upsert_query ,
33+ make_message_upsert_query ,
4034)
35+ from ..schemas .message_confirmation import MessageConfirmation
36+ from aleph .storage import get_json , pin_hash , add_json , get_message_content
37+ from .tx_context import TxContext
4138
4239LOGGER = logging .getLogger ("chains.common" )
4340
@@ -65,21 +62,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
6562
6663async def delayed_incoming (
6764 message : BasePendingMessage ,
68- chain_name : Optional [str ] = None ,
69- tx_hash : Optional [str ] = None ,
70- height : Optional [int ] = None ,
65+ tx_context : Optional [TxContext ] = None ,
66+ check_message : bool = True ,
7167):
7268 if message is None :
7369 return
70+
7471 await PendingMessage .collection .insert_one (
7572 {
7673 "message" : message .dict (exclude = {"content" }),
77- "source" : dict (
78- chain_name = chain_name ,
79- tx_hash = tx_hash ,
80- height = height ,
81- check_message = True , # should we store this?
82- ),
74+ "tx_context" : tx_context .dict () if tx_context else None ,
75+ "check_message" : check_message ,
8376 }
8477 )
8578
@@ -92,27 +85,15 @@ class IncomingStatus(IntEnum):
9285
9386async def mark_message_for_retry (
9487 message : BasePendingMessage ,
95- chain_name : Optional [str ],
96- tx_hash : Optional [str ],
97- height : Optional [int ],
88+ tx_context : Optional [TxContext ],
9889 check_message : bool ,
9990 retrying : bool ,
10091 existing_id ,
10192):
10293 message_dict = message .dict (exclude = {"content" })
10394
10495 if not retrying :
105- await PendingMessage .collection .insert_one (
106- {
107- "message" : message_dict ,
108- "source" : dict (
109- chain_name = chain_name ,
110- tx_hash = tx_hash ,
111- height = height ,
112- check_message = check_message , # should we store this?
113- ),
114- }
115- )
96+ await delayed_incoming (message , tx_context , check_message )
11697 else :
11798 LOGGER .debug (f"Incrementing for { existing_id } " )
11899 result = await PendingMessage .collection .update_one (
@@ -123,9 +104,7 @@ async def mark_message_for_retry(
123104
124105async def incoming (
125106 pending_message : BasePendingMessage ,
126- chain_name : Optional [str ] = None ,
127- tx_hash : Optional [str ] = None ,
128- height : Optional [int ] = None ,
107+ tx_context : Optional [TxContext ] = None ,
129108 seen_ids : Optional [Dict [Tuple , int ]] = None ,
130109 check_message : bool = True ,
131110 retrying : bool = False ,
@@ -140,16 +119,23 @@ async def incoming(
140119 item_hash = pending_message .item_hash
141120 sender = pending_message .sender
142121 confirmations = []
122+ chain_name = tx_context .chain if tx_context is not None else None
143123 ids_key = (item_hash , sender , chain_name )
144124
145- if chain_name and tx_hash and height :
125+ if tx_context :
146126 if seen_ids is not None :
147127 if ids_key in seen_ids .keys ():
148- if height > seen_ids [ids_key ]:
128+ if tx_context . height > seen_ids [ids_key ]:
149129 return IncomingStatus .MESSAGE_HANDLED , []
150130
151131 confirmations .append (
152- MessageConfirmation (chain = chain_name , hash = tx_hash , height = height )
132+ MessageConfirmation (
133+ chain = tx_context .chain ,
134+ hash = tx_context .hash ,
135+ height = tx_context .height ,
136+ time = tx_context .time ,
137+ publisher = tx_context .publisher ,
138+ )
153139 )
154140
155141 filters = {
@@ -179,14 +165,14 @@ async def incoming(
179165 updates : Dict [str , Dict ] = {}
180166
181167 if existing :
182- if seen_ids is not None and height is not None :
168+ if seen_ids is not None and tx_context is not None :
183169 if ids_key in seen_ids .keys ():
184- if height > seen_ids [ids_key ]:
170+ if tx_context . height > seen_ids [ids_key ]:
185171 return IncomingStatus .MESSAGE_HANDLED , []
186172 else :
187- seen_ids [ids_key ] = height
173+ seen_ids [ids_key ] = tx_context . height
188174 else :
189- seen_ids [ids_key ] = height
175+ seen_ids [ids_key ] = tx_context . height
190176
191177 LOGGER .debug ("Updating %s." % item_hash )
192178
@@ -206,9 +192,7 @@ async def incoming(
206192 LOGGER .exception ("Can't get content of object %r" % item_hash )
207193 await mark_message_for_retry (
208194 message = pending_message ,
209- chain_name = chain_name ,
210- tx_hash = tx_hash ,
211- height = height ,
195+ tx_context = tx_context ,
212196 check_message = check_message ,
213197 retrying = retrying ,
214198 existing_id = existing_id ,
@@ -217,13 +201,14 @@ async def incoming(
217201
218202 try :
219203 validated_message = validate_pending_message (
220- pending_message = pending_message , content = content , confirmations = confirmations
204+ pending_message = pending_message ,
205+ content = content ,
206+ confirmations = confirmations ,
221207 )
222208 except ValidationError as e :
223209 LOGGER .warning ("Invalid pending message: %s - %s" , item_hash , str (e ))
224210 return IncomingStatus .FAILED_PERMANENTLY , []
225211
226-
227212 # warning: those handlers can modify message and content in place
228213 # and return a status. None has to be retried, -1 is discarded, True is
229214 # handled and kept.
@@ -250,9 +235,7 @@ async def incoming(
250235 LOGGER .debug ("Message type handler has failed, retrying later." )
251236 await mark_message_for_retry (
252237 message = pending_message ,
253- chain_name = chain_name ,
254- tx_hash = tx_hash ,
255- height = height ,
238+ tx_context = tx_context ,
256239 check_message = check_message ,
257240 retrying = retrying ,
258241 existing_id = existing_id ,
@@ -270,14 +253,14 @@ async def incoming(
270253 LOGGER .warning ("Invalid sender for %s" % item_hash )
271254 return IncomingStatus .MESSAGE_HANDLED , []
272255
273- if seen_ids is not None and height is not None :
256+ if seen_ids is not None and tx_context is not None :
274257 if ids_key in seen_ids .keys ():
275- if height > seen_ids [ids_key ]:
258+ if tx_context . height > seen_ids [ids_key ]:
276259 return IncomingStatus .MESSAGE_HANDLED , []
277260 else :
278- seen_ids [ids_key ] = height
261+ seen_ids [ids_key ] = tx_context . height
279262 else :
280- seen_ids [ids_key ] = height
263+ seen_ids [ids_key ] = tx_context . height
281264
282265 LOGGER .debug ("New message to store for %s." % item_hash )
283266
@@ -392,5 +375,5 @@ async def incoming_chaindata(content: Dict, context: TxContext):
392375 For now we only add it to the database, it will be processed later.
393376 """
394377 await PendingTX .collection .insert_one (
395- {"content" : content , "context" : asdict ( context )}
378+ {"content" : content , "context" : context . dict ( )}
396379 )
0 commit comments