Skip to content

Commit 4a7d346

Browse files
committed
Internal: store the full TX context in confirmations (#265)
We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format. Renamed some fields of the `TxContext` class in order to use the same format in all DB collections and to avoid a breaking change in the messages confirmation format.
1 parent 7accb4c commit 4a7d346

File tree

13 files changed

+152
-99
lines changed

13 files changed

+152
-99
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
This migration retrieves additional metadata regarding chain confirmation of messages,
3+
including the block timestamp. We reset the TX height of the node to reprocess
4+
all the chain data messages and insert additional values
5+
"""
6+
7+
8+
import logging
9+
import os
10+
from configmanager import Config
11+
from aleph.model.chains import Chain
12+
from aleph.model.pending import PendingMessage, PendingTX
13+
from aleph.model.messages import Message
14+
15+
logger = logging.getLogger(os.path.basename(__file__))
16+
17+
18+
async def upgrade(config: Config, **kwargs):
19+
logger.info("Resetting chain height to re-fetch all chaindata...")
20+
start_height = config.ethereum.start_height.value
21+
await Chain.set_last_height("ETH", start_height)
22+
23+
logger.info("Dropping all pending transactions...")
24+
await PendingTX.collection.delete_many({})
25+
26+
logger.info(
27+
"Dropping all pending confirmation messages "
28+
"(they will be reinserted automatically)..."
29+
)
30+
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})
31+
32+
logger.info("Removing confirmation data for all messages...")
33+
# Confirmations will be automatically added again by the pending TX processor.
34+
# By removing the confirmation entirely, we make sure to avoid intermediate states
35+
# if a message was confirmed in an unexpected way.
36+
await Message.collection.update_many(
37+
{"confirmed": True},
38+
{
39+
"$set": {
40+
"confirmed": False,
41+
},
42+
"$unset": {"confirmations": 1},
43+
},
44+
)
45+
46+
47+
async def downgrade(**kwargs):
48+
raise NotImplementedError("Downgrading this migration is not supported.")

src/aleph/chains/common.py

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import asyncio
22
import json
33
import logging
4-
from dataclasses import asdict
54
from enum import IntEnum
65
from typing import Dict, Optional, Tuple, List
76

8-
from aleph_message.models import MessageConfirmation
97
from bson import ObjectId
108
from pymongo import UpdateOne
119

@@ -25,18 +23,17 @@
2523
from aleph.model.pending import PendingMessage, PendingTX
2624
from aleph.network import verify_signature
2725
from aleph.permissions import check_sender_authorization
28-
from aleph.storage import get_json, pin_hash, add_json, get_message_content
29-
from .tx_context import TxContext
30-
from aleph.schemas.pending_messages import (
31-
BasePendingMessage,
32-
)
26+
from aleph.schemas.pending_messages import BasePendingMessage
3327
from aleph.schemas.validated_message import (
3428
validate_pending_message,
3529
ValidatedStoreMessage,
3630
ValidatedForgetMessage,
3731
make_confirmation_update_query,
38-
make_message_upsert_query,
32+
make_message_upsert_query,
3933
)
34+
from ..schemas.message_confirmation import MessageConfirmation
35+
from aleph.storage import get_json, pin_hash, add_json, get_message_content
36+
from .tx_context import TxContext
4037

4138
LOGGER = logging.getLogger("chains.common")
4239

@@ -64,21 +61,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
6461

6562
async def delayed_incoming(
6663
message: BasePendingMessage,
67-
chain_name: Optional[str] = None,
68-
tx_hash: Optional[str] = None,
69-
height: Optional[int] = None,
64+
tx_context: Optional[TxContext] = None,
65+
check_message: bool = True,
7066
):
7167
if message is None:
7268
return
69+
7370
await PendingMessage.collection.insert_one(
7471
{
7572
"message": message.dict(exclude={"content"}),
76-
"source": dict(
77-
chain_name=chain_name,
78-
tx_hash=tx_hash,
79-
height=height,
80-
check_message=True, # should we store this?
81-
),
73+
"tx_context": tx_context.dict() if tx_context else None,
74+
"check_message": check_message,
8275
}
8376
)
8477

@@ -91,27 +84,15 @@ class IncomingStatus(IntEnum):
9184

9285
async def mark_message_for_retry(
9386
message: BasePendingMessage,
94-
chain_name: Optional[str],
95-
tx_hash: Optional[str],
96-
height: Optional[int],
87+
tx_context: Optional[TxContext],
9788
check_message: bool,
9889
retrying: bool,
9990
existing_id,
10091
):
10192
message_dict = message.dict(exclude={"content"})
10293

10394
if not retrying:
104-
await PendingMessage.collection.insert_one(
105-
{
106-
"message": message_dict,
107-
"source": dict(
108-
chain_name=chain_name,
109-
tx_hash=tx_hash,
110-
height=height,
111-
check_message=check_message, # should we store this?
112-
),
113-
}
114-
)
95+
await delayed_incoming(message, tx_context, check_message)
11596
else:
11697
LOGGER.debug(f"Incrementing for {existing_id}")
11798
result = await PendingMessage.collection.update_one(
@@ -122,9 +103,7 @@ async def mark_message_for_retry(
122103

123104
async def incoming(
124105
pending_message: BasePendingMessage,
125-
chain_name: Optional[str] = None,
126-
tx_hash: Optional[str] = None,
127-
height: Optional[int] = None,
106+
tx_context: Optional[TxContext] = None,
128107
seen_ids: Optional[Dict[Tuple, int]] = None,
129108
check_message: bool = False,
130109
retrying: bool = False,
@@ -139,16 +118,23 @@ async def incoming(
139118
item_hash = pending_message.item_hash
140119
sender = pending_message.sender
141120
confirmations = []
121+
chain_name = tx_context.chain if tx_context is not None else None
142122
ids_key = (item_hash, sender, chain_name)
143123

144-
if chain_name and tx_hash and height:
124+
if tx_context:
145125
if seen_ids is not None:
146126
if ids_key in seen_ids.keys():
147-
if height > seen_ids[ids_key]:
127+
if tx_context.height > seen_ids[ids_key]:
148128
return IncomingStatus.MESSAGE_HANDLED, []
149129

150130
confirmations.append(
151-
MessageConfirmation(chain=chain_name, hash=tx_hash, height=height)
131+
MessageConfirmation(
132+
chain=tx_context.chain,
133+
hash=tx_context.hash,
134+
height=tx_context.height,
135+
time=tx_context.time,
136+
publisher=tx_context.publisher,
137+
)
152138
)
153139

154140
filters = {
@@ -178,14 +164,14 @@ async def incoming(
178164
updates: Dict[str, Dict] = {}
179165

180166
if existing:
181-
if seen_ids is not None and height is not None:
167+
if seen_ids is not None and tx_context is not None:
182168
if ids_key in seen_ids.keys():
183-
if height > seen_ids[ids_key]:
169+
if tx_context.height > seen_ids[ids_key]:
184170
return IncomingStatus.MESSAGE_HANDLED, []
185171
else:
186-
seen_ids[ids_key] = height
172+
seen_ids[ids_key] = tx_context.height
187173
else:
188-
seen_ids[ids_key] = height
174+
seen_ids[ids_key] = tx_context.height
189175

190176
LOGGER.debug("Updating %s." % item_hash)
191177

@@ -205,17 +191,17 @@ async def incoming(
205191
LOGGER.exception("Can't get content of object %r" % item_hash)
206192
await mark_message_for_retry(
207193
message=pending_message,
208-
chain_name=chain_name,
209-
tx_hash=tx_hash,
210-
height=height,
194+
tx_context=tx_context,
211195
check_message=check_message,
212196
retrying=retrying,
213197
existing_id=existing_id,
214198
)
215199
return IncomingStatus.RETRYING_LATER, []
216200

217201
validated_message = validate_pending_message(
218-
pending_message=pending_message, content=content, confirmations=confirmations
202+
pending_message=pending_message,
203+
content=content,
204+
confirmations=confirmations,
219205
)
220206

221207
# warning: those handlers can modify message and content in place
@@ -244,9 +230,7 @@ async def incoming(
244230
LOGGER.debug("Message type handler has failed, retrying later.")
245231
await mark_message_for_retry(
246232
message=pending_message,
247-
chain_name=chain_name,
248-
tx_hash=tx_hash,
249-
height=height,
233+
tx_context=tx_context,
250234
check_message=check_message,
251235
retrying=retrying,
252236
existing_id=existing_id,
@@ -264,14 +248,14 @@ async def incoming(
264248
LOGGER.warning("Invalid sender for %s" % item_hash)
265249
return IncomingStatus.MESSAGE_HANDLED, []
266250

267-
if seen_ids is not None and height is not None:
251+
if seen_ids is not None and tx_context is not None:
268252
if ids_key in seen_ids.keys():
269-
if height > seen_ids[ids_key]:
253+
if tx_context.height > seen_ids[ids_key]:
270254
return IncomingStatus.MESSAGE_HANDLED, []
271255
else:
272-
seen_ids[ids_key] = height
256+
seen_ids[ids_key] = tx_context.height
273257
else:
274-
seen_ids[ids_key] = height
258+
seen_ids[ids_key] = tx_context.height
275259

276260
LOGGER.debug("New message to store for %s." % item_hash)
277261

@@ -386,5 +370,5 @@ async def incoming_chaindata(content: Dict, context: TxContext):
386370
For now we only add it to the database, it will be processed later.
387371
"""
388372
await PendingTX.collection.insert_one(
389-
{"content": content, "context": asdict(context)}
373+
{"content": content, "context": context.dict()}
390374
)

src/aleph/chains/ethereum.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ async def request_transactions(
198198
try:
199199
jdata = json.loads(message)
200200
context = TxContext(
201-
chain_name=CHAIN_NAME,
202-
tx_hash=event_data.transactionHash.hex(),
201+
chain=CHAIN_NAME,
202+
hash=event_data.transactionHash.hex(),
203203
time=timestamp,
204204
height=event_data.blockNumber,
205205
publisher=publisher,

src/aleph/chains/nuls2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
175175
jdata = json.loads(ddata)
176176

177177
context = TxContext(
178-
chain_name=CHAIN_NAME,
179-
tx_hash=tx["hash"],
178+
chain=CHAIN_NAME,
179+
hash=tx["hash"],
180180
height=tx["height"],
181181
time=tx["createTime"],
182182
publisher=tx["coinFroms"][0]["address"],

src/aleph/chains/tx_context.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from dataclasses import dataclass
1+
from aleph.schemas.message_confirmation import MessageConfirmation
22

33

4-
@dataclass
5-
class TxContext:
6-
chain_name: str
7-
tx_hash: str
8-
height: int
9-
# Transaction timestamp, in Unix time (number of seconds since epoch).
10-
time: int
11-
publisher: str
4+
# At the moment, confirmation = chain transaction. This might change, but in the meantime
5+
# having TxContext inherit MessageConfirmation avoids code duplication.
6+
class TxContext(MessageConfirmation):
7+
pass

src/aleph/jobs/process_pending_messages.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
from setproctitle import setproctitle
1515

1616
from aleph.chains.common import incoming, IncomingStatus
17+
from aleph.exceptions import InvalidMessageError
1718
from aleph.logging import setup_logging
1819
from aleph.model.db_bulk_operation import DbBulkOperation
1920
from aleph.model.pending import PendingMessage
21+
from aleph.schemas.pending_messages import parse_message
2022
from aleph.services.p2p import singleton
2123
from .job_utils import prepare_loop, process_job_results
22-
from ..exceptions import InvalidMessageError
23-
from ..schemas.pending_messages import parse_message
24+
from ..chains.tx_context import TxContext
2425

2526
LOGGER = getLogger("jobs.pending_messages")
2627

@@ -60,12 +61,13 @@ async def handle_pending_message(
6061
# If an invalid message somehow ended in pending messages, drop it.
6162
return [delete_pending_message_op]
6263

64+
tx_context_dict = pending.get("tx_context")
65+
tx_context = TxContext.parse_obj(tx_context_dict) if tx_context_dict else None
66+
6367
async with sem:
6468
status, operations = await incoming(
6569
pending_message=message,
66-
chain_name=pending["source"].get("chain_name"),
67-
tx_hash=pending["source"].get("tx_hash"),
68-
height=pending["source"].get("height"),
70+
tx_context=tx_context,
6971
seen_ids=seen_ids,
7072
check_message=pending["source"].get("check_message", True),
7173
retrying=True,

src/aleph/jobs/process_pending_txs.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
import asyncio
66
import logging
7-
from typing import List, Dict, Optional, Set
7+
from typing import List, Dict, Optional
8+
from typing import Set
89

910
import sentry_sdk
1011
from configmanager import Config
@@ -31,7 +32,7 @@ async def handle_pending_tx(
3132

3233
db_operations: List[DbBulkOperation] = []
3334
tx_context = TxContext(**pending_tx["context"])
34-
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)
35+
LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height)
3536

3637
messages = await get_chaindata_messages(
3738
pending_tx["content"], tx_context, seen_ids=seen_ids
@@ -55,12 +56,8 @@ async def handle_pending_tx(
5556
operation=InsertOne(
5657
{
5758
"message": message.dict(exclude={"content"}),
58-
"source": dict(
59-
chain_name=tx_context.chain_name,
60-
tx_hash=tx_context.tx_hash,
61-
height=tx_context.height,
62-
check_message=True, # should we store this?
63-
),
59+
"tx_context": tx_context.dict(),
60+
"check_message": True,
6461
}
6562
),
6663
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from aleph_message.models import Chain
2+
from pydantic import BaseModel, Field
3+
4+
5+
class MessageConfirmation(BaseModel):
6+
chain: Chain = Field(..., description="Chain from which the confirmation was fetched.")
7+
height: int = Field(..., description="Block in which the confirmation was published.")
8+
hash: str = Field(
9+
...,
10+
description="Hash of the transaction/block in which the confirmation was published.",
11+
)
12+
time: float = Field(
13+
...,
14+
description="Transaction timestamp, in Unix time (number of seconds since epoch).",
15+
)
16+
publisher: str = Field(..., description="Publisher of the confirmation on chain.")

0 commit comments

Comments
 (0)