Skip to content

Commit 205b3c1

Browse files
committed
[TXs] Store the full TX context in confirmations
We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format. Renamed some fields of the `TxContext` class in order to use the same format in all DB collections and to avoid a breaking change in the messages confirmation format.
1 parent 1c379e3 commit 205b3c1

File tree

9 files changed

+117
-79
lines changed

9 files changed

+117
-79
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
This migration retrieves additional metadata regarding chain confirmation of messages,
3+
including the block timestamp. We reset the TX height of the node to reprocess
4+
all the chain data messages and insert additional values
5+
"""
6+
7+
8+
import logging
9+
import os
10+
from configmanager import Config
11+
from aleph.model.chains import Chain
12+
from aleph.model.pending import PendingMessage, PendingTX
13+
from aleph.model.messages import Message
14+
15+
logger = logging.getLogger(os.path.basename(__file__))
16+
17+
18+
async def upgrade(config: Config, **kwargs):
19+
logger.info("Resetting chain height to re-fetch all chaindata...")
20+
start_height = config.ethereum.start_height.value
21+
await Chain.set_last_height("ETH", start_height)
22+
23+
logger.info("Dropping all pending transactions...")
24+
await PendingTX.collection.delete_many({})
25+
26+
logger.info(
27+
"Dropping all pending confirmation messages "
28+
"(they will be reinserted automatically)..."
29+
)
30+
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})
31+
32+
logger.info("Removing confirmation data for all messages...")
33+
# Confirmations will be automatically added again by the pending TX processor.
34+
# By removing the confirmation entirely, we make sure to avoid intermediate states
35+
# if a message was confirmed in an unexpected way.
36+
await Message.collection.update_many(
37+
{"confirmed": True},
38+
{
39+
"$set": {
40+
"confirmed": False,
41+
},
42+
"$unset": {"confirmations": 1},
43+
},
44+
)
45+
46+
47+
async def downgrade(**kwargs):
48+
raise NotImplementedError("Downgrading this migration is not supported.")

src/aleph/chains/common.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
5151
}
5252

5353

54-
async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None):
54+
async def delayed_incoming(
55+
message, tx_context: Optional[TxContext] = None, check_message: bool = True
56+
):
5557
if message is None:
5658
return
59+
5760
await PendingMessage.collection.insert_one(
5861
{
5962
"message": message,
60-
"source": dict(
61-
chain_name=chain_name,
62-
tx_hash=tx_hash,
63-
height=height,
64-
check_message=True, # should we store this?
65-
),
63+
"tx_context": asdict(tx_context) if tx_context is not None else None,
64+
"check_message": check_message,
6665
}
6766
)
6867

@@ -75,25 +74,13 @@ class IncomingStatus(IntEnum):
7574

7675
async def mark_message_for_retry(
7776
message: Dict,
78-
chain_name: Optional[str],
79-
tx_hash: Optional[str],
80-
height: Optional[int],
77+
tx_context: Optional[TxContext],
8178
check_message: bool,
8279
retrying: bool,
8380
existing_id,
8481
):
8582
if not retrying:
86-
await PendingMessage.collection.insert_one(
87-
{
88-
"message": message,
89-
"source": dict(
90-
chain_name=chain_name,
91-
tx_hash=tx_hash,
92-
height=height,
93-
check_message=check_message, # should we store this?
94-
),
95-
}
96-
)
83+
await delayed_incoming(message, tx_context, check_message)
9784
else:
9885
LOGGER.debug(f"Incrementing for {existing_id}")
9986
result = await PendingMessage.collection.update_one(
@@ -104,9 +91,7 @@ async def mark_message_for_retry(
10491

10592
async def incoming(
10693
message: Dict,
107-
chain_name: Optional[str] = None,
108-
tx_hash: Optional[str] = None,
109-
height: Optional[int] = None,
94+
tx_context: Optional[TxContext] = None,
11095
seen_ids: Optional[Dict[Tuple, int]] = None,
11196
check_message: bool = False,
11297
retrying: bool = False,
@@ -120,11 +105,12 @@ async def incoming(
120105

121106
item_hash = message["item_hash"]
122107
sender = message["sender"]
108+
chain_name = tx_context.chain if tx_context is not None else None
123109
ids_key = (item_hash, sender, chain_name)
124110

125-
if chain_name and tx_hash and height and seen_ids is not None:
111+
if tx_context is not None and seen_ids is not None:
126112
if ids_key in seen_ids.keys():
127-
if height > seen_ids[ids_key]:
113+
if tx_context.height > seen_ids[ids_key]:
128114
return IncomingStatus.MESSAGE_HANDLED, []
129115

130116
filters = {
@@ -143,7 +129,7 @@ async def incoming(
143129
# check/sanitize the message if needed
144130
try:
145131
message = await check_message_fn(
146-
message, from_chain=(chain_name is not None)
132+
message, from_chain=(tx_context is not None)
147133
)
148134
except InvalidMessageError:
149135
return IncomingStatus.FAILED_PERMANENTLY, []
@@ -164,16 +150,14 @@ async def incoming(
164150
# existing = await Message.collection.find_one(
165151
# filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1})
166152

167-
if chain_name and tx_hash and height:
153+
if tx_context is not None:
168154
# We are getting a confirmation here
169-
new_values = await mark_confirmed_data(chain_name, tx_hash, height)
170-
171155
updates = {
172156
"$set": {
173157
"confirmed": True,
174158
},
175159
"$min": {"time": message["time"]},
176-
"$addToSet": {"confirmations": new_values["confirmations"][0]},
160+
"$addToSet": {"confirmations": asdict(tx_context)},
177161
}
178162
else:
179163
updates = {
@@ -186,14 +170,14 @@ async def incoming(
186170
# new_values = {'confirmed': False} # this should be our default.
187171
should_commit = False
188172
if existing:
189-
if seen_ids is not None and height is not None:
173+
if seen_ids is not None and tx_context is not None:
190174
if ids_key in seen_ids.keys():
191-
if height > seen_ids[ids_key]:
175+
if tx_context.height > seen_ids[ids_key]:
192176
return IncomingStatus.MESSAGE_HANDLED, []
193177
else:
194-
seen_ids[ids_key] = height
178+
seen_ids[ids_key] = tx_context.height
195179
else:
196-
seen_ids[ids_key] = height
180+
seen_ids[ids_key] = tx_context.height
197181

198182
# THIS CODE SHOULD BE HERE...
199183
# But, if a race condition appeared, we might have the message twice.
@@ -203,7 +187,7 @@ async def incoming(
203187

204188
LOGGER.debug("Updating %s." % item_hash)
205189

206-
if chain_name and tx_hash and height:
190+
if tx_context is not None:
207191
# we need to update messages adding the confirmation
208192
# await Message.collection.update_many(filters, updates)
209193
should_commit = True
@@ -224,9 +208,7 @@ async def incoming(
224208
LOGGER.exception("Can't get content of object %r" % item_hash)
225209
await mark_message_for_retry(
226210
message=message,
227-
chain_name=chain_name,
228-
tx_hash=tx_hash,
229-
height=height,
211+
tx_context=tx_context,
230212
check_message=check_message,
231213
retrying=retrying,
232214
existing_id=existing_id,
@@ -266,9 +248,7 @@ async def incoming(
266248
LOGGER.debug("Message type handler has failed, retrying later.")
267249
await mark_message_for_retry(
268250
message=message,
269-
chain_name=chain_name,
270-
tx_hash=tx_hash,
271-
height=height,
251+
tx_context=tx_context,
272252
check_message=check_message,
273253
retrying=retrying,
274254
existing_id=existing_id,
@@ -286,14 +266,14 @@ async def incoming(
286266
LOGGER.warning("Invalid sender for %s" % item_hash)
287267
return IncomingStatus.MESSAGE_HANDLED, []
288268

289-
if seen_ids is not None and height is not None:
269+
if seen_ids is not None and tx_context is not None:
290270
if ids_key in seen_ids.keys():
291-
if height > seen_ids[ids_key]:
271+
if tx_context.height > seen_ids[ids_key]:
292272
return IncomingStatus.MESSAGE_HANDLED, []
293273
else:
294-
seen_ids[ids_key] = height
274+
seen_ids[ids_key] = tx_context.height
295275
else:
296-
seen_ids[ids_key] = height
276+
seen_ids[ids_key] = tx_context.height
297277

298278
LOGGER.debug("New message to store for %s." % item_hash)
299279
# message.update(new_values)

src/aleph/chains/ethereum.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ async def request_transactions(
196196
try:
197197
jdata = json.loads(message)
198198
context = TxContext(
199-
chain_name=CHAIN_NAME,
200-
tx_hash=event_data.transactionHash.hex(),
199+
chain=CHAIN_NAME,
200+
hash=event_data.transactionHash.hex(),
201201
time=timestamp,
202202
height=event_data.blockNumber,
203203
publisher=publisher,

src/aleph/chains/nuls2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
174174
jdata = json.loads(ddata)
175175

176176
context = TxContext(
177-
chain_name=CHAIN_NAME,
178-
tx_hash=tx["hash"],
177+
chain=CHAIN_NAME,
178+
hash=tx["hash"],
179179
height=tx["height"],
180180
time=tx["createTime"],
181181
publisher=tx["coinFroms"][0]["address"],

src/aleph/chains/tx_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
@dataclass
55
class TxContext:
6-
chain_name: str
7-
tx_hash: str
6+
chain: str
7+
hash: str
88
height: int
99
# Transaction timestamp, in Unix time (number of seconds since epoch).
1010
time: int

src/aleph/jobs/process_pending_messages.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from aleph.services.p2p import singleton
2020
from aleph.types import ItemType
2121
from .job_utils import prepare_loop, gather_and_perform_db_operations
22+
from ..chains.tx_context import TxContext
2223

2324
LOGGER = getLogger("jobs.pending_messages")
2425

@@ -27,13 +28,17 @@ async def handle_pending_message(
2728
pending: Dict,
2829
seen_ids: Dict[Tuple, int],
2930
) -> List[DbBulkOperation]:
31+
32+
try:
33+
tx_context = TxContext(**pending["tx_context"])
34+
except KeyError:
35+
tx_context = None
36+
3037
status, operations = await incoming(
3138
pending["message"],
32-
chain_name=pending["source"].get("chain_name"),
33-
tx_hash=pending["source"].get("tx_hash"),
34-
height=pending["source"].get("height"),
39+
tx_context=tx_context,
3540
seen_ids=seen_ids,
36-
check_message=pending["source"].get("check_message", True),
41+
check_message=pending.get("check_message", True),
3742
retrying=True,
3843
existing_id=pending["_id"],
3944
)

src/aleph/jobs/process_pending_txs.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import logging
7+
from dataclasses import asdict
78
from typing import List, Dict, Optional
89

910
import sentry_sdk
@@ -15,12 +16,12 @@
1516
from aleph.chains.tx_context import TxContext
1617
from aleph.exceptions import InvalidMessageError
1718
from aleph.logging import setup_logging
19+
from aleph.model.db_bulk_operation import DbBulkOperation
1820
from aleph.model.pending import PendingMessage, PendingTX
1921
from aleph.network import check_message
2022
from aleph.services.p2p import singleton
21-
from .job_utils import prepare_loop, gather_and_perform_db_operations
22-
from aleph.model.db_bulk_operation import DbBulkOperation
2323
from aleph.toolkit.batch import async_batch
24+
from .job_utils import prepare_loop, gather_and_perform_db_operations
2425

2526
LOGGER = logging.getLogger("jobs.pending_txs")
2627

@@ -31,7 +32,7 @@ async def handle_pending_tx(
3132

3233
db_operations: List[DbBulkOperation] = []
3334
tx_context = TxContext(**pending_tx["context"])
34-
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)
35+
LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height)
3536

3637
messages = await get_chaindata_messages(
3738
pending_tx["content"], tx_context, seen_ids=seen_ids
@@ -55,12 +56,8 @@ async def handle_pending_tx(
5556
operation=InsertOne(
5657
{
5758
"message": message,
58-
"source": dict(
59-
chain_name=tx_context.chain_name,
60-
tx_hash=tx_context.tx_hash,
61-
height=tx_context.height,
62-
check_message=True, # should we store this?
63-
),
59+
"tx_context": asdict(tx_context),
60+
"check_message": True,
6461
}
6562
),
6663
)

0 commit comments

Comments
 (0)