Skip to content

Commit

Permalink
Internal: store the full TX context in confirmations (#265)
Browse files Browse the repository at this point in the history
We now store the entire context of the transaction in the confirmations
array of messages. This means that two additional fields are now
preserved: the transaction block timestamp and the publisher.

As we need to re-fetch this data from chain data, a new migration
script resets the chain height to re-process all transactions.
We reset the confirmation status of all messages to unconfirmed
and deleted their confirmations array to let the node automatically
migrate to the new format.

Renamed some fields of the `TxContext` class in order to use
the same format in all DB collections and to avoid a breaking
change in the messages confirmation format.
  • Loading branch information
odesenfans authored Oct 13, 2022
1 parent 8c51cf5 commit f4446ff
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 99 deletions.
48 changes: 48 additions & 0 deletions deployment/migrations/scripts/0003-retrieve-confirmation-time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
This migration retrieves additional metadata regarding chain confirmation of messages,
including the block timestamp. We reset the TX height of the node to reprocess
all the chain data messages and insert additional values
"""


import logging
import os
from configmanager import Config
from aleph.model.chains import Chain
from aleph.model.pending import PendingMessage, PendingTX
from aleph.model.messages import Message

logger = logging.getLogger(os.path.basename(__file__))


async def upgrade(config: Config, **kwargs):
logger.info("Resetting chain height to re-fetch all chaindata...")
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)

logger.info("Dropping all pending transactions...")
await PendingTX.collection.delete_many({})

logger.info(
"Dropping all pending confirmation messages "
"(they will be reinserted automatically)..."
)
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})

logger.info("Removing confirmation data for all messages...")
# Confirmations will be automatically added again by the pending TX processor.
# By removing the confirmation entirely, we make sure to avoid intermediate states
# if a message was confirmed in an unexpected way.
await Message.collection.update_many(
{"confirmed": True},
{
"$set": {
"confirmed": False,
},
"$unset": {"confirmations": 1},
},
)


async def downgrade(**kwargs):
raise NotImplementedError("Downgrading this migration is not supported.")
90 changes: 37 additions & 53 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import json
import logging
from dataclasses import asdict
from enum import IntEnum
from typing import Dict, Optional, Tuple, List

from aleph_message.models import MessageConfirmation
from bson import ObjectId
from pymongo import UpdateOne

Expand All @@ -25,18 +23,17 @@
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import verify_signature
from aleph.permissions import check_sender_authorization
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from .tx_context import TxContext
from aleph.schemas.pending_messages import (
BasePendingMessage,
)
from aleph.schemas.pending_messages import BasePendingMessage
from aleph.schemas.validated_message import (
validate_pending_message,
ValidatedStoreMessage,
ValidatedForgetMessage,
make_confirmation_update_query,
make_message_upsert_query,
make_message_upsert_query,
)
from ..schemas.message_confirmation import MessageConfirmation
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from .tx_context import TxContext

LOGGER = logging.getLogger("chains.common")

Expand Down Expand Up @@ -64,21 +61,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):

async def delayed_incoming(
message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
tx_context: Optional[TxContext] = None,
check_message: bool = True,
):
if message is None:
return

await PendingMessage.collection.insert_one(
{
"message": message.dict(exclude={"content"}),
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
check_message=True, # should we store this?
),
"tx_context": tx_context.dict() if tx_context else None,
"check_message": check_message,
}
)

Expand All @@ -91,27 +84,15 @@ class IncomingStatus(IntEnum):

async def mark_message_for_retry(
message: BasePendingMessage,
chain_name: Optional[str],
tx_hash: Optional[str],
height: Optional[int],
tx_context: Optional[TxContext],
check_message: bool,
retrying: bool,
existing_id,
):
message_dict = message.dict(exclude={"content"})

if not retrying:
await PendingMessage.collection.insert_one(
{
"message": message_dict,
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
check_message=check_message, # should we store this?
),
}
)
await delayed_incoming(message, tx_context, check_message)
else:
LOGGER.debug(f"Incrementing for {existing_id}")
result = await PendingMessage.collection.update_one(
Expand All @@ -122,9 +103,7 @@ async def mark_message_for_retry(

async def incoming(
pending_message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
tx_context: Optional[TxContext] = None,
seen_ids: Optional[Dict[Tuple, int]] = None,
check_message: bool = False,
retrying: bool = False,
Expand All @@ -139,16 +118,23 @@ async def incoming(
item_hash = pending_message.item_hash
sender = pending_message.sender
confirmations = []
chain_name = tx_context.chain if tx_context is not None else None
ids_key = (item_hash, sender, chain_name)

if chain_name and tx_hash and height:
if tx_context:
if seen_ids is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []

confirmations.append(
MessageConfirmation(chain=chain_name, hash=tx_hash, height=height)
MessageConfirmation(
chain=tx_context.chain,
hash=tx_context.hash,
height=tx_context.height,
time=tx_context.time,
publisher=tx_context.publisher,
)
)

filters = {
Expand Down Expand Up @@ -178,14 +164,14 @@ async def incoming(
updates: Dict[str, Dict] = {}

if existing:
if seen_ids is not None and height is not None:
if seen_ids is not None and tx_context is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height

LOGGER.debug("Updating %s." % item_hash)

Expand All @@ -205,17 +191,17 @@ async def incoming(
LOGGER.exception("Can't get content of object %r" % item_hash)
await mark_message_for_retry(
message=pending_message,
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
existing_id=existing_id,
)
return IncomingStatus.RETRYING_LATER, []

validated_message = validate_pending_message(
pending_message=pending_message, content=content, confirmations=confirmations
pending_message=pending_message,
content=content,
confirmations=confirmations,
)

# warning: those handlers can modify message and content in place
Expand Down Expand Up @@ -244,9 +230,7 @@ async def incoming(
LOGGER.debug("Message type handler has failed, retrying later.")
await mark_message_for_retry(
message=pending_message,
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
existing_id=existing_id,
Expand All @@ -264,14 +248,14 @@ async def incoming(
LOGGER.warning("Invalid sender for %s" % item_hash)
return IncomingStatus.MESSAGE_HANDLED, []

if seen_ids is not None and height is not None:
if seen_ids is not None and tx_context is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height

LOGGER.debug("New message to store for %s." % item_hash)

Expand Down Expand Up @@ -386,5 +370,5 @@ async def incoming_chaindata(content: Dict, context: TxContext):
For now we only add it to the database, it will be processed later.
"""
await PendingTX.collection.insert_one(
{"content": content, "context": asdict(context)}
{"content": content, "context": context.dict()}
)
4 changes: 2 additions & 2 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ async def request_transactions(
try:
jdata = json.loads(message)
context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=event_data.transactionHash.hex(),
chain=CHAIN_NAME,
hash=event_data.transactionHash.hex(),
time=timestamp,
height=event_data.blockNumber,
publisher=publisher,
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
jdata = json.loads(ddata)

context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=tx["hash"],
chain=CHAIN_NAME,
hash=tx["hash"],
height=tx["height"],
time=tx["createTime"],
publisher=tx["coinFroms"][0]["address"],
Expand Down
14 changes: 5 additions & 9 deletions src/aleph/chains/tx_context.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from dataclasses import dataclass
from aleph.schemas.message_confirmation import MessageConfirmation


@dataclass
class TxContext:
chain_name: str
tx_hash: str
height: int
# Transaction timestamp, in Unix time (number of seconds since epoch).
time: int
publisher: str
# At the moment, confirmation = chain transaction. This might change, but in the meantime
# having TxContext inherit MessageConfirmation avoids code duplication.
class TxContext(MessageConfirmation):
pass
12 changes: 7 additions & 5 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
from setproctitle import setproctitle

from aleph.chains.common import incoming, IncomingStatus
from aleph.exceptions import InvalidMessageError
from aleph.logging import setup_logging
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.model.pending import PendingMessage
from aleph.schemas.pending_messages import parse_message
from aleph.services.p2p import singleton
from .job_utils import prepare_loop, process_job_results
from ..exceptions import InvalidMessageError
from ..schemas.pending_messages import parse_message
from ..chains.tx_context import TxContext

LOGGER = getLogger("jobs.pending_messages")

Expand Down Expand Up @@ -60,12 +61,13 @@ async def handle_pending_message(
# If an invalid message somehow ended in pending messages, drop it.
return [delete_pending_message_op]

tx_context_dict = pending.get("tx_context")
tx_context = TxContext.parse_obj(tx_context_dict) if tx_context_dict else None

async with sem:
status, operations = await incoming(
pending_message=message,
chain_name=pending["source"].get("chain_name"),
tx_hash=pending["source"].get("tx_hash"),
height=pending["source"].get("height"),
tx_context=tx_context,
seen_ids=seen_ids,
check_message=pending["source"].get("check_message", True),
retrying=True,
Expand Down
13 changes: 5 additions & 8 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import asyncio
import logging
from typing import List, Dict, Optional, Set
from typing import List, Dict, Optional
from typing import Set

import sentry_sdk
from configmanager import Config
Expand All @@ -31,7 +32,7 @@ async def handle_pending_tx(

db_operations: List[DbBulkOperation] = []
tx_context = TxContext(**pending_tx["context"])
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)
LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height)

messages = await get_chaindata_messages(
pending_tx["content"], tx_context, seen_ids=seen_ids
Expand All @@ -55,12 +56,8 @@ async def handle_pending_tx(
operation=InsertOne(
{
"message": message.dict(exclude={"content"}),
"source": dict(
chain_name=tx_context.chain_name,
tx_hash=tx_context.tx_hash,
height=tx_context.height,
check_message=True, # should we store this?
),
"tx_context": tx_context.dict(),
"check_message": True,
}
),
)
Expand Down
16 changes: 16 additions & 0 deletions src/aleph/schemas/message_confirmation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from aleph_message.models import Chain
from pydantic import BaseModel, Field


class MessageConfirmation(BaseModel):
chain: Chain = Field(..., description="Chain from which the confirmation was fetched.")
height: int = Field(..., description="Block in which the confirmation was published.")
hash: str = Field(
...,
description="Hash of the transaction/block in which the confirmation was published.",
)
time: float = Field(
...,
description="Transaction timestamp, in Unix time (number of seconds since epoch).",
)
publisher: str = Field(..., description="Publisher of the confirmation on chain.")
Loading

0 comments on commit f4446ff

Please sign in to comment.