Skip to content

Commit c9951cf

Browse files
committed
[TXs] Store the full TX context in confirmations
We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format.
1 parent 1c379e3 commit c9951cf

File tree

5 files changed

+109
-70
lines changed

5 files changed

+109
-70
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
This migration retrieves additional metadata regarding chain confirmation of messages,
3+
including the block timestamp. We reset the TX height of the node to reprocess
4+
all the chain data messages and insert additional values
5+
"""
6+
7+
8+
import logging
9+
import os
10+
from configmanager import Config
11+
from aleph.model.chains import Chain
12+
from aleph.model.pending import PendingMessage, PendingTX
13+
from aleph.model.messages import Message
14+
15+
logger = logging.getLogger(os.path.basename(__file__))
16+
17+
18+
async def upgrade(config: Config, **kwargs):
19+
logger.info("Resetting chain height to re-fetch all chaindata...")
20+
start_height = config.ethereum.start_height.value
21+
await Chain.set_last_height("ETH", start_height)
22+
23+
logger.info("Dropping all pending transactions...")
24+
await PendingTX.collection.delete_many({})
25+
26+
logger.info(
27+
"Dropping all pending confirmation messages "
28+
"(they will be reinserted automatically)..."
29+
)
30+
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})
31+
32+
logger.info("Removing confirmation data for all messages...")
33+
# Confirmations will be automatically added again by the pending TX processor.
34+
# By removing the confirmation entirely, we make sure to avoid intermediate states
35+
# if a message was confirmed in an unexpected way.
36+
await Message.collection.update_many(
37+
{"confirmed": True},
38+
{
39+
"$set": {
40+
"confirmed": False,
41+
},
42+
"$unset": {"confirmations": 1},
43+
},
44+
)
45+
46+
47+
async def downgrade(**kwargs):
48+
# Nothing to do, the key file is still present in the key directory
49+
pass

src/aleph/chains/common.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
5151
}
5252

5353

54-
async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None):
54+
async def delayed_incoming(
55+
message, tx_context: Optional[TxContext] = None, check_message: bool = True
56+
):
5557
if message is None:
5658
return
59+
5760
await PendingMessage.collection.insert_one(
5861
{
5962
"message": message,
60-
"source": dict(
61-
chain_name=chain_name,
62-
tx_hash=tx_hash,
63-
height=height,
64-
check_message=True, # should we store this?
65-
),
63+
"tx_context": asdict(tx_context) if tx_context is not None else None,
64+
"check_message": check_message,
6665
}
6766
)
6867

@@ -75,25 +74,13 @@ class IncomingStatus(IntEnum):
7574

7675
async def mark_message_for_retry(
7776
message: Dict,
78-
chain_name: Optional[str],
79-
tx_hash: Optional[str],
80-
height: Optional[int],
77+
tx_context: Optional[TxContext],
8178
check_message: bool,
8279
retrying: bool,
8380
existing_id,
8481
):
8582
if not retrying:
86-
await PendingMessage.collection.insert_one(
87-
{
88-
"message": message,
89-
"source": dict(
90-
chain_name=chain_name,
91-
tx_hash=tx_hash,
92-
height=height,
93-
check_message=check_message, # should we store this?
94-
),
95-
}
96-
)
83+
await delayed_incoming(message, tx_context, check_message)
9784
else:
9885
LOGGER.debug(f"Incrementing for {existing_id}")
9986
result = await PendingMessage.collection.update_one(
@@ -104,9 +91,7 @@ async def mark_message_for_retry(
10491

10592
async def incoming(
10693
message: Dict,
107-
chain_name: Optional[str] = None,
108-
tx_hash: Optional[str] = None,
109-
height: Optional[int] = None,
94+
tx_context: Optional[TxContext] = None,
11095
seen_ids: Optional[Dict[Tuple, int]] = None,
11196
check_message: bool = False,
11297
retrying: bool = False,
@@ -120,11 +105,12 @@ async def incoming(
120105

121106
item_hash = message["item_hash"]
122107
sender = message["sender"]
108+
chain_name = tx_context.chain_name if tx_context is not None else None
123109
ids_key = (item_hash, sender, chain_name)
124110

125-
if chain_name and tx_hash and height and seen_ids is not None:
111+
if tx_context is not None and seen_ids is not None:
126112
if ids_key in seen_ids.keys():
127-
if height > seen_ids[ids_key]:
113+
if tx_context.height > seen_ids[ids_key]:
128114
return IncomingStatus.MESSAGE_HANDLED, []
129115

130116
filters = {
@@ -143,7 +129,7 @@ async def incoming(
143129
# check/sanitize the message if needed
144130
try:
145131
message = await check_message_fn(
146-
message, from_chain=(chain_name is not None)
132+
message, from_chain=(tx_context is not None)
147133
)
148134
except InvalidMessageError:
149135
return IncomingStatus.FAILED_PERMANENTLY, []
@@ -164,16 +150,14 @@ async def incoming(
164150
# existing = await Message.collection.find_one(
165151
# filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1})
166152

167-
if chain_name and tx_hash and height:
153+
if tx_context is not None:
168154
# We are getting a confirmation here
169-
new_values = await mark_confirmed_data(chain_name, tx_hash, height)
170-
171155
updates = {
172156
"$set": {
173157
"confirmed": True,
174158
},
175159
"$min": {"time": message["time"]},
176-
"$addToSet": {"confirmations": new_values["confirmations"][0]},
160+
"$addToSet": {"confirmations": asdict(tx_context)},
177161
}
178162
else:
179163
updates = {
@@ -186,14 +170,14 @@ async def incoming(
186170
# new_values = {'confirmed': False} # this should be our default.
187171
should_commit = False
188172
if existing:
189-
if seen_ids is not None and height is not None:
173+
if seen_ids is not None and tx_context is not None:
190174
if ids_key in seen_ids.keys():
191-
if height > seen_ids[ids_key]:
175+
if tx_context.height > seen_ids[ids_key]:
192176
return IncomingStatus.MESSAGE_HANDLED, []
193177
else:
194-
seen_ids[ids_key] = height
178+
seen_ids[ids_key] = tx_context.height
195179
else:
196-
seen_ids[ids_key] = height
180+
seen_ids[ids_key] = tx_context.height
197181

198182
# THIS CODE SHOULD BE HERE...
199183
# But, if a race condition appeared, we might have the message twice.
@@ -203,7 +187,7 @@ async def incoming(
203187

204188
LOGGER.debug("Updating %s." % item_hash)
205189

206-
if chain_name and tx_hash and height:
190+
if tx_context is not None:
207191
# we need to update messages adding the confirmation
208192
# await Message.collection.update_many(filters, updates)
209193
should_commit = True
@@ -224,9 +208,7 @@ async def incoming(
224208
LOGGER.exception("Can't get content of object %r" % item_hash)
225209
await mark_message_for_retry(
226210
message=message,
227-
chain_name=chain_name,
228-
tx_hash=tx_hash,
229-
height=height,
211+
tx_context=tx_context,
230212
check_message=check_message,
231213
retrying=retrying,
232214
existing_id=existing_id,
@@ -266,9 +248,7 @@ async def incoming(
266248
LOGGER.debug("Message type handler has failed, retrying later.")
267249
await mark_message_for_retry(
268250
message=message,
269-
chain_name=chain_name,
270-
tx_hash=tx_hash,
271-
height=height,
251+
tx_context=tx_context,
272252
check_message=check_message,
273253
retrying=retrying,
274254
existing_id=existing_id,
@@ -286,14 +266,14 @@ async def incoming(
286266
LOGGER.warning("Invalid sender for %s" % item_hash)
287267
return IncomingStatus.MESSAGE_HANDLED, []
288268

289-
if seen_ids is not None and height is not None:
269+
if seen_ids is not None and tx_context is not None:
290270
if ids_key in seen_ids.keys():
291-
if height > seen_ids[ids_key]:
271+
if tx_context.height > seen_ids[ids_key]:
292272
return IncomingStatus.MESSAGE_HANDLED, []
293273
else:
294-
seen_ids[ids_key] = height
274+
seen_ids[ids_key] = tx_context.height
295275
else:
296-
seen_ids[ids_key] = height
276+
seen_ids[ids_key] = tx_context.height
297277

298278
LOGGER.debug("New message to store for %s." % item_hash)
299279
# message.update(new_values)

src/aleph/jobs/process_pending_messages.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from aleph.services.p2p import singleton
2020
from aleph.types import ItemType
2121
from .job_utils import prepare_loop, gather_and_perform_db_operations
22+
from ..chains.tx_context import TxContext
2223

2324
LOGGER = getLogger("jobs.pending_messages")
2425

@@ -27,13 +28,17 @@ async def handle_pending_message(
2728
pending: Dict,
2829
seen_ids: Dict[Tuple, int],
2930
) -> List[DbBulkOperation]:
31+
32+
try:
33+
tx_context = TxContext(**pending["tx_context"])
34+
except KeyError:
35+
tx_context = None
36+
3037
status, operations = await incoming(
3138
pending["message"],
32-
chain_name=pending["source"].get("chain_name"),
33-
tx_hash=pending["source"].get("tx_hash"),
34-
height=pending["source"].get("height"),
39+
tx_context=tx_context,
3540
seen_ids=seen_ids,
36-
check_message=pending["source"].get("check_message", True),
41+
check_message=pending.get("check_message", True),
3742
retrying=True,
3843
existing_id=pending["_id"],
3944
)

src/aleph/jobs/process_pending_txs.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import logging
7+
from dataclasses import asdict
78
from typing import List, Dict, Optional
89

910
import sentry_sdk
@@ -15,12 +16,12 @@
1516
from aleph.chains.tx_context import TxContext
1617
from aleph.exceptions import InvalidMessageError
1718
from aleph.logging import setup_logging
19+
from aleph.model.db_bulk_operation import DbBulkOperation
1820
from aleph.model.pending import PendingMessage, PendingTX
1921
from aleph.network import check_message
2022
from aleph.services.p2p import singleton
21-
from .job_utils import prepare_loop, gather_and_perform_db_operations
22-
from aleph.model.db_bulk_operation import DbBulkOperation
2323
from aleph.toolkit.batch import async_batch
24+
from .job_utils import prepare_loop, gather_and_perform_db_operations
2425

2526
LOGGER = logging.getLogger("jobs.pending_txs")
2627

@@ -55,12 +56,8 @@ async def handle_pending_tx(
5556
operation=InsertOne(
5657
{
5758
"message": message,
58-
"source": dict(
59-
chain_name=tx_context.chain_name,
60-
tx_hash=tx_context.tx_hash,
61-
height=tx_context.height,
62-
check_message=True, # should we store this?
63-
),
59+
"tx_context": asdict(tx_context),
60+
"check_message": True,
6461
}
6562
),
6663
)

tests/chains/test_confirmation.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import json
2+
from dataclasses import asdict
23
from typing import Dict
34

45
import pytest
56

67
from aleph.chains.common import process_one_message
8+
from aleph.chains.tx_context import TxContext
79
from aleph.model.messages import CappedMessage, Message
810

9-
1011
MESSAGE = {
1112
"chain": "ETH",
1213
"sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64",
@@ -56,18 +57,21 @@ async def test_confirm_message(test_db):
5657
assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db)
5758

5859
# Now, confirm the message
59-
chain_name, tx_hash, height = "ETH", "123", 8000
60-
await process_one_message(
61-
MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height
60+
tx_context = TxContext(
61+
chain_name="ETH",
62+
tx_hash="123",
63+
height=8000,
64+
time=120000,
65+
publisher="0xdeadbeef",
6266
)
67+
await process_one_message(MESSAGE, tx_context)
6368

6469
message_in_db = await Message.collection.find_one({"item_hash": item_hash})
6570

6671
assert message_in_db is not None
6772
assert message_in_db["confirmed"]
68-
assert {"chain": chain_name, "hash": tx_hash, "height": height} in message_in_db[
69-
"confirmations"
70-
]
73+
expected_confirmations = [asdict(tx_context)]
74+
assert message_in_db["confirmations"] == expected_confirmations
7175

7276
capped_message_after_confirmation = await CappedMessage.collection.find_one(
7377
{"item_hash": item_hash}
@@ -89,17 +93,21 @@ async def test_process_confirmed_message(test_db):
8993
item_hash = MESSAGE["item_hash"]
9094

9195
# Now, confirm the message
92-
chain_name, tx_hash, height = "ETH", "123", 8000
93-
await process_one_message(
94-
MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height
96+
tx_context = TxContext(
97+
chain_name="ETH",
98+
tx_hash="123",
99+
height=8000,
100+
time=120000,
101+
publisher="0xdeadbeef",
95102
)
103+
await process_one_message(MESSAGE, tx_context=tx_context)
96104

97105
message_in_db = await Message.collection.find_one({"item_hash": item_hash})
98106

99107
assert message_in_db is not None
100108
assert message_in_db["confirmed"]
101109

102-
expected_confirmations = [{"chain": chain_name, "hash": tx_hash, "height": height}]
110+
expected_confirmations = [asdict(tx_context)]
103111
assert message_in_db["confirmations"] == expected_confirmations
104112

105113
capped_message_in_db = await CappedMessage.collection.find_one(
@@ -108,4 +116,4 @@ async def test_process_confirmed_message(test_db):
108116

109117
assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db)
110118
assert capped_message_in_db["confirmed"]
111-
assert capped_message_in_db["confirmations"] == expected_confirmations
119+
assert capped_message_in_db["confirmations"] == [asdict(tx_context)]

0 commit comments

Comments
 (0)