Skip to content

Commit f1a4ce4

Browse files
committed
[TXs] Store the full TX context in confirmations
We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format. Renamed some fields of the `TxContext` class in order to use the same format in all DB collections and to avoid a breaking change in the messages confirmation format.
1 parent d10cdd0 commit f1a4ce4

File tree

9 files changed

+115
-77
lines changed

9 files changed

+115
-77
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
This migration retrieves additional metadata regarding chain confirmation of messages,
3+
including the block timestamp. We reset the TX height of the node to reprocess
4+
all the chain data messages and insert additional values
5+
"""
6+
7+
8+
import logging
9+
import os
10+
from configmanager import Config
11+
from aleph.model.chains import Chain
12+
from aleph.model.pending import PendingMessage, PendingTX
13+
from aleph.model.messages import Message
14+
15+
logger = logging.getLogger(os.path.basename(__file__))
16+
17+
18+
async def upgrade(config: Config, **kwargs):
19+
logger.info("Resetting chain height to re-fetch all chaindata...")
20+
start_height = config.ethereum.start_height.value
21+
await Chain.set_last_height("ETH", start_height)
22+
23+
logger.info("Dropping all pending transactions...")
24+
await PendingTX.collection.delete_many({})
25+
26+
logger.info(
27+
"Dropping all pending confirmation messages "
28+
"(they will be reinserted automatically)..."
29+
)
30+
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})
31+
32+
logger.info("Removing confirmation data for all messages...")
33+
# Confirmations will be automatically added again by the pending TX processor.
34+
# By removing the confirmation entirely, we make sure to avoid intermediate states
35+
# if a message was confirmed in an unexpected way.
36+
await Message.collection.update_many(
37+
{"confirmed": True},
38+
{
39+
"$set": {
40+
"confirmed": False,
41+
},
42+
"$unset": {"confirmations": 1},
43+
},
44+
)
45+
46+
47+
async def downgrade(**kwargs):
48+
raise NotImplementedError("Downgrading this migration is not supported.")

src/aleph/chains/common.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
5454
}
5555

5656

57-
async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None):
57+
async def delayed_incoming(
58+
message, tx_context: Optional[TxContext] = None, check_message: bool = True
59+
):
5860
if message is None:
5961
return
62+
6063
await PendingMessage.collection.insert_one(
6164
{
6265
"message": message,
63-
"source": dict(
64-
chain_name=chain_name,
65-
tx_hash=tx_hash,
66-
height=height,
67-
check_message=True, # should we store this?
68-
),
66+
"tx_context": asdict(tx_context) if tx_context is not None else None,
67+
"check_message": check_message,
6968
}
7069
)
7170

@@ -78,25 +77,13 @@ class IncomingStatus(IntEnum):
7877

7978
async def mark_message_for_retry(
8079
message: Dict,
81-
chain_name: Optional[str],
82-
tx_hash: Optional[str],
83-
height: Optional[int],
80+
tx_context: Optional[TxContext],
8481
check_message: bool,
8582
retrying: bool,
8683
existing_id,
8784
):
8885
if not retrying:
89-
await PendingMessage.collection.insert_one(
90-
{
91-
"message": message,
92-
"source": dict(
93-
chain_name=chain_name,
94-
tx_hash=tx_hash,
95-
height=height,
96-
check_message=check_message, # should we store this?
97-
),
98-
}
99-
)
86+
await delayed_incoming(message, tx_context, check_message)
10087
else:
10188
LOGGER.debug(f"Incrementing for {existing_id}")
10289
result = await PendingMessage.collection.update_one(
@@ -124,9 +111,7 @@ def update_message_item_type(message_dict: Dict) -> Dict:
124111

125112
async def incoming(
126113
message: Dict,
127-
chain_name: Optional[str] = None,
128-
tx_hash: Optional[str] = None,
129-
height: Optional[int] = None,
114+
tx_context: Optional[TxContext] = None,
130115
seen_ids: Optional[Dict[Tuple, int]] = None,
131116
check_message: bool = False,
132117
retrying: bool = False,
@@ -144,11 +129,12 @@ async def incoming(
144129

145130
item_hash = message["item_hash"]
146131
sender = message["sender"]
132+
chain_name = tx_context.chain if tx_context is not None else None
147133
ids_key = (item_hash, sender, chain_name)
148134

149-
if chain_name and tx_hash and height and seen_ids is not None:
135+
if tx_context is not None and seen_ids is not None:
150136
if ids_key in seen_ids.keys():
151-
if height > seen_ids[ids_key]:
137+
if tx_context.height > seen_ids[ids_key]:
152138
return IncomingStatus.MESSAGE_HANDLED, []
153139

154140
filters = {
@@ -167,7 +153,7 @@ async def incoming(
167153
# check/sanitize the message if needed
168154
try:
169155
message = await check_message_fn(
170-
message, from_chain=(chain_name is not None)
156+
message, from_chain=(tx_context is not None)
171157
)
172158
except InvalidMessageError:
173159
return IncomingStatus.FAILED_PERMANENTLY, []
@@ -188,16 +174,14 @@ async def incoming(
188174
# existing = await Message.collection.find_one(
189175
# filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1})
190176

191-
if chain_name and tx_hash and height:
177+
if tx_context is not None:
192178
# We are getting a confirmation here
193-
new_values = await mark_confirmed_data(chain_name, tx_hash, height)
194-
195179
updates = {
196180
"$set": {
197181
"confirmed": True,
198182
},
199183
"$min": {"time": message["time"]},
200-
"$addToSet": {"confirmations": new_values["confirmations"][0]},
184+
"$addToSet": {"confirmations": asdict(tx_context)},
201185
}
202186
else:
203187
updates = {
@@ -210,14 +194,14 @@ async def incoming(
210194
# new_values = {'confirmed': False} # this should be our default.
211195
should_commit = False
212196
if existing:
213-
if seen_ids is not None and height is not None:
197+
if seen_ids is not None and tx_context is not None:
214198
if ids_key in seen_ids.keys():
215-
if height > seen_ids[ids_key]:
199+
if tx_context.height > seen_ids[ids_key]:
216200
return IncomingStatus.MESSAGE_HANDLED, []
217201
else:
218-
seen_ids[ids_key] = height
202+
seen_ids[ids_key] = tx_context.height
219203
else:
220-
seen_ids[ids_key] = height
204+
seen_ids[ids_key] = tx_context.height
221205

222206
# THIS CODE SHOULD BE HERE...
223207
# But, if a race condition appeared, we might have the message twice.
@@ -227,7 +211,7 @@ async def incoming(
227211

228212
LOGGER.debug("Updating %s." % item_hash)
229213

230-
if chain_name and tx_hash and height:
214+
if tx_context is not None:
231215
# we need to update messages adding the confirmation
232216
# await Message.collection.update_many(filters, updates)
233217
should_commit = True
@@ -248,9 +232,7 @@ async def incoming(
248232
LOGGER.exception("Can't get content of object %r" % item_hash)
249233
await mark_message_for_retry(
250234
message=message,
251-
chain_name=chain_name,
252-
tx_hash=tx_hash,
253-
height=height,
235+
tx_context=tx_context,
254236
check_message=check_message,
255237
retrying=retrying,
256238
existing_id=existing_id,
@@ -290,9 +272,7 @@ async def incoming(
290272
LOGGER.debug("Message type handler has failed, retrying later.")
291273
await mark_message_for_retry(
292274
message=message,
293-
chain_name=chain_name,
294-
tx_hash=tx_hash,
295-
height=height,
275+
tx_context=tx_context,
296276
check_message=check_message,
297277
retrying=retrying,
298278
existing_id=existing_id,
@@ -310,14 +290,14 @@ async def incoming(
310290
LOGGER.warning("Invalid sender for %s" % item_hash)
311291
return IncomingStatus.MESSAGE_HANDLED, []
312292

313-
if seen_ids is not None and height is not None:
293+
if seen_ids is not None and tx_context is not None:
314294
if ids_key in seen_ids.keys():
315-
if height > seen_ids[ids_key]:
295+
if tx_context.height > seen_ids[ids_key]:
316296
return IncomingStatus.MESSAGE_HANDLED, []
317297
else:
318-
seen_ids[ids_key] = height
298+
seen_ids[ids_key] = tx_context.height
319299
else:
320-
seen_ids[ids_key] = height
300+
seen_ids[ids_key] = tx_context.height
321301

322302
LOGGER.debug("New message to store for %s." % item_hash)
323303
# message.update(new_values)

src/aleph/chains/ethereum.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ async def request_transactions(
197197
try:
198198
jdata = json.loads(message)
199199
context = TxContext(
200-
chain_name=CHAIN_NAME,
201-
tx_hash=event_data.transactionHash.hex(),
200+
chain=CHAIN_NAME,
201+
hash=event_data.transactionHash.hex(),
202202
time=timestamp,
203203
height=event_data.blockNumber,
204204
publisher=publisher,

src/aleph/chains/nuls2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
175175
jdata = json.loads(ddata)
176176

177177
context = TxContext(
178-
chain_name=CHAIN_NAME,
179-
tx_hash=tx["hash"],
178+
chain=CHAIN_NAME,
179+
hash=tx["hash"],
180180
height=tx["height"],
181181
time=tx["createTime"],
182182
publisher=tx["coinFroms"][0]["address"],

src/aleph/chains/tx_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
@dataclass
55
class TxContext:
6-
chain_name: str
7-
tx_hash: str
6+
chain: str
7+
hash: str
88
height: int
99
# Transaction timestamp, in Unix time (number of seconds since epoch).
1010
time: int

src/aleph/jobs/process_pending_messages.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from aleph.model.pending import PendingMessage
2020
from aleph.services.p2p import singleton
2121
from .job_utils import prepare_loop, process_job_results
22+
from ..chains.tx_context import TxContext
2223

2324
LOGGER = getLogger("jobs.pending_messages")
2425

@@ -48,12 +49,15 @@ async def handle_pending_message(
4849
seen_ids: Dict[Tuple, int],
4950
) -> List[DbBulkOperation]:
5051

52+
try:
53+
tx_context = TxContext(**pending["tx_context"])
54+
except KeyError:
55+
tx_context = None
56+
5157
async with sem:
5258
status, operations = await incoming(
5359
pending["message"],
54-
chain_name=pending["source"].get("chain_name"),
55-
tx_hash=pending["source"].get("tx_hash"),
56-
height=pending["source"].get("height"),
60+
tx_context=tx_context,
5761
seen_ids=seen_ids,
5862
check_message=pending["source"].get("check_message", True),
5963
retrying=True,

src/aleph/jobs/process_pending_txs.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import asyncio
66
import logging
7-
from typing import List, Dict, Optional, Set
7+
from dataclasses import asdict
8+
from typing import List, Dict, Optional
9+
from typing import Set
810

911
import sentry_sdk
1012
from configmanager import Config
@@ -31,7 +33,7 @@ async def handle_pending_tx(
3133

3234
db_operations: List[DbBulkOperation] = []
3335
tx_context = TxContext(**pending_tx["context"])
34-
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)
36+
LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height)
3537

3638
messages = await get_chaindata_messages(
3739
pending_tx["content"], tx_context, seen_ids=seen_ids
@@ -55,12 +57,8 @@ async def handle_pending_tx(
5557
operation=InsertOne(
5658
{
5759
"message": message,
58-
"source": dict(
59-
chain_name=tx_context.chain_name,
60-
tx_hash=tx_context.tx_hash,
61-
height=tx_context.height,
62-
check_message=True, # should we store this?
63-
),
60+
"tx_context": asdict(tx_context),
61+
"check_message": True,
6462
}
6563
),
6664
)

tests/chains/test_confirmation.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import json
2+
from dataclasses import asdict
23
from typing import Dict
34

45
import pytest
56

67
from aleph.chains.common import process_one_message
8+
from aleph.chains.tx_context import TxContext
79
from aleph.model.messages import CappedMessage, Message
810

9-
1011
MESSAGE = {
1112
"chain": "ETH",
1213
"sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64",
@@ -56,18 +57,21 @@ async def test_confirm_message(test_db):
5657
assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db)
5758

5859
# Now, confirm the message
59-
chain_name, tx_hash, height = "ETH", "123", 8000
60-
await process_one_message(
61-
MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height
60+
tx_context = TxContext(
61+
chain="ETH",
62+
hash="123",
63+
height=8000,
64+
time=120000,
65+
publisher="0xdeadbeef",
6266
)
67+
await process_one_message(MESSAGE, tx_context)
6368

6469
message_in_db = await Message.collection.find_one({"item_hash": item_hash})
6570

6671
assert message_in_db is not None
6772
assert message_in_db["confirmed"]
68-
assert {"chain": chain_name, "hash": tx_hash, "height": height} in message_in_db[
69-
"confirmations"
70-
]
73+
expected_confirmations = [asdict(tx_context)]
74+
assert message_in_db["confirmations"] == expected_confirmations
7175

7276
capped_message_after_confirmation = await CappedMessage.collection.find_one(
7377
{"item_hash": item_hash}
@@ -89,17 +93,21 @@ async def test_process_confirmed_message(test_db):
8993
item_hash = MESSAGE["item_hash"]
9094

9195
# Now, confirm the message
92-
chain_name, tx_hash, height = "ETH", "123", 8000
93-
await process_one_message(
94-
MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height
96+
tx_context = TxContext(
97+
chain="ETH",
98+
hash="123",
99+
height=8000,
100+
time=120000,
101+
publisher="0xdeadbeef",
95102
)
103+
await process_one_message(MESSAGE, tx_context=tx_context)
96104

97105
message_in_db = await Message.collection.find_one({"item_hash": item_hash})
98106

99107
assert message_in_db is not None
100108
assert message_in_db["confirmed"]
101109

102-
expected_confirmations = [{"chain": chain_name, "hash": tx_hash, "height": height}]
110+
expected_confirmations = [asdict(tx_context)]
103111
assert message_in_db["confirmations"] == expected_confirmations
104112

105113
capped_message_in_db = await CappedMessage.collection.find_one(
@@ -108,4 +116,4 @@ async def test_process_confirmed_message(test_db):
108116

109117
assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db)
110118
assert capped_message_in_db["confirmed"]
111-
assert capped_message_in_db["confirmations"] == expected_confirmations
119+
assert capped_message_in_db["confirmations"] == [asdict(tx_context)]

0 commit comments

Comments
 (0)