Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messages] Reception and confirmation times #267

Closed
48 changes: 48 additions & 0 deletions deployment/migrations/scripts/0003-retrieve-confirmation-time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
This migration retrieves additional metadata regarding chain confirmation of messages,
including the block timestamp. We reset the TX height of the node to reprocess
all the chain data messages and insert additional values
"""


import logging
import os
from configmanager import Config
from aleph.model.chains import Chain
from aleph.model.pending import PendingMessage, PendingTX
from aleph.model.messages import Message

logger = logging.getLogger(os.path.basename(__file__))


async def upgrade(config: Config, **kwargs):
logger.info("Resetting chain height to re-fetch all chaindata...")
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)

logger.info("Dropping all pending transactions...")
await PendingTX.collection.delete_many({})

logger.info(
"Dropping all pending confirmation messages "
"(they will be reinserted automatically)..."
)
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})

logger.info("Removing confirmation data for all messages...")
# Confirmations will be automatically added again by the pending TX processor.
# By removing the confirmation entirely, we make sure to avoid intermediate states
# if a message was confirmed in an unexpected way.
await Message.collection.update_many(
{"confirmed": True},
{
"$set": {
"confirmed": False,
},
"$unset": {"confirmations": 1},
},
)


async def downgrade(**kwargs):
raise NotImplementedError("Downgrading this migration is not supported.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This migration adds the `confirmation_time` and `reception_time` fields.
`confirmation_time` serves as a cache of the first confirmation message seen
in on-chain data.
`reception_time` represents the first time the node became aware of
the message, confirmed or not.
"""


import logging
import os

from configmanager import Config

from aleph.model.messages import Message

logger = logging.getLogger(os.path.basename(__file__))


async def upgrade(config: Config, **kwargs):
logger.info("Creating confirmation_time field for messages...")
await Message.collection.update_many(
{"confirmed": True},
[{"$set": {"confirmation_time": {"$min": "$confirmations.time"}}}],
)


async def downgrade(**kwargs):
logger.info("Creating confirmation_time field for messages...")
await Message.collection.update_many(
{"$unset": {"confirmation_time": 1, "reception_time": 1}}
)
11 changes: 8 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ package_dir =
setup_requires =
pyscaffold>=3.1a0,<3.2a0
pytest-runner>=2.0,<3dev
# Add here dependencies of your project (semicolon/line-separated), e.g.

# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts.
# Update with care. Dependencies that were added to make it all work are annotated accordingly.
install_requires =
aiocache==0.11.1
aiohttp-cors==0.7.0
Expand All @@ -38,13 +40,15 @@ install_requires =
aiohttp==3.8.1
aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7
aleph-client==0.4.6
aleph-message==0.2.1
aleph-message==0.2.2
aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c
coincurve==15.0.1
configmanager==1.35.1
configparser==5.2.0
cosmospy==6.0.0
dataclasses_json==0.5.6
eciespy==0.3.11 # eth dependency
eth-hash==0.3.3 # eth dependency
eth-keys==0.3.3
eth-rlp==0.2.1
eth_account==0.5.6
Expand All @@ -53,13 +57,14 @@ install_requires =
msgpack==1.0.3 # required by aiocache
nuls2-python@git+https://github.com/aleph-im/nuls2-python.git
p2pclient==0.2.0
protobuf==3.20.3 # eth dependency
pymongo==3.12.3
pynacl==1.5.0
python-dateutil==2.8.2
python-socketio==5.5.1
pytz==2021.3
pyyaml==6.0
requests==2.27.1
requests==2.28.1
secp256k1==0.14.0
sentry-sdk==1.5.11
setproctitle==1.2.2
Expand Down
102 changes: 49 additions & 53 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import json
import logging
from dataclasses import asdict
from enum import IntEnum
from typing import Dict, Optional, Tuple, List
from typing import Any, Dict, Optional, Tuple, List

from aleph_message.models import MessageConfirmation
from bson import ObjectId
from pymongo import UpdateOne

Expand All @@ -25,18 +23,17 @@
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import verify_signature
from aleph.permissions import check_sender_authorization
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from .tx_context import TxContext
from aleph.schemas.pending_messages import (
BasePendingMessage,
)
from aleph.schemas.pending_messages import BasePendingMessage
from aleph.schemas.validated_message import (
validate_pending_message,
ValidatedStoreMessage,
ValidatedForgetMessage,
make_confirmation_update_query,
make_message_upsert_query,
make_message_upsert_query,
)
from ..schemas.message_confirmation import MessageConfirmation
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from .tx_context import TxContext

LOGGER = logging.getLogger("chains.common")

Expand Down Expand Up @@ -64,21 +61,19 @@ async def mark_confirmed_data(chain_name, tx_hash, height):

async def delayed_incoming(
message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
reception_time: float,
tx_context: Optional[TxContext] = None,
check_message: bool = True,
):
if message is None:
return

await PendingMessage.collection.insert_one(
{
"message": message.dict(exclude={"content"}),
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
check_message=True, # should we store this?
),
"tx_context": tx_context.dict() if tx_context else None,
"reception_time": reception_time,
"check_message": check_message,
}
)

Expand All @@ -91,26 +86,20 @@ class IncomingStatus(IntEnum):

async def mark_message_for_retry(
message: BasePendingMessage,
chain_name: Optional[str],
tx_hash: Optional[str],
height: Optional[int],
reception_time: float,
tx_context: Optional[TxContext],
check_message: bool,
retrying: bool,
existing_id,
):
message_dict = message.dict(exclude={"content"})

if not retrying:
await PendingMessage.collection.insert_one(
{
"message": message_dict,
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
check_message=check_message, # should we store this?
),
}
await delayed_incoming(
message,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
)
else:
LOGGER.debug(f"Incrementing for {existing_id}")
Expand All @@ -122,9 +111,8 @@ async def mark_message_for_retry(

async def incoming(
pending_message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
reception_time: float,
tx_context: Optional[TxContext] = None,
seen_ids: Optional[Dict[Tuple, int]] = None,
check_message: bool = False,
retrying: bool = False,
Expand All @@ -139,16 +127,23 @@ async def incoming(
item_hash = pending_message.item_hash
sender = pending_message.sender
confirmations = []
chain_name = tx_context.chain if tx_context is not None else None
ids_key = (item_hash, sender, chain_name)

if chain_name and tx_hash and height:
if tx_context:
if seen_ids is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []

confirmations.append(
MessageConfirmation(chain=chain_name, hash=tx_hash, height=height)
MessageConfirmation(
chain=tx_context.chain,
hash=tx_context.hash,
height=tx_context.height,
time=tx_context.time,
publisher=tx_context.publisher,
)
)

filters = {
Expand Down Expand Up @@ -178,14 +173,14 @@ async def incoming(
updates: Dict[str, Dict] = {}

if existing:
if seen_ids is not None and height is not None:
if seen_ids is not None and tx_context is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height

LOGGER.debug("Updating %s." % item_hash)

Expand All @@ -205,17 +200,19 @@ async def incoming(
LOGGER.exception("Can't get content of object %r" % item_hash)
await mark_message_for_retry(
message=pending_message,
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
existing_id=existing_id,
)
return IncomingStatus.RETRYING_LATER, []

validated_message = validate_pending_message(
pending_message=pending_message, content=content, confirmations=confirmations
pending_message=pending_message,
content=content,
reception_time=reception_time,
confirmations=confirmations,
)

# warning: those handlers can modify message and content in place
Expand Down Expand Up @@ -244,9 +241,8 @@ async def incoming(
LOGGER.debug("Message type handler has failed, retrying later.")
await mark_message_for_retry(
message=pending_message,
chain_name=chain_name,
tx_hash=tx_hash,
height=height,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
existing_id=existing_id,
Expand All @@ -264,14 +260,14 @@ async def incoming(
LOGGER.warning("Invalid sender for %s" % item_hash)
return IncomingStatus.MESSAGE_HANDLED, []

if seen_ids is not None and height is not None:
if seen_ids is not None and tx_context is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
if tx_context.height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height
else:
seen_ids[ids_key] = height
seen_ids[ids_key] = tx_context.height

LOGGER.debug("New message to store for %s." % item_hash)

Expand Down Expand Up @@ -386,5 +382,5 @@ async def incoming_chaindata(content: Dict, context: TxContext):
For now we only add it to the database, it will be processed later.
"""
await PendingTX.collection.insert_one(
{"content": content, "context": asdict(context)}
{"content": content, "context": context.dict()}
)
4 changes: 2 additions & 2 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ async def request_transactions(
try:
jdata = json.loads(message)
context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=event_data.transactionHash.hex(),
chain=CHAIN_NAME,
hash=event_data.transactionHash.hex(),
time=timestamp,
height=event_data.blockNumber,
publisher=publisher,
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
jdata = json.loads(ddata)

context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=tx["hash"],
chain=CHAIN_NAME,
hash=tx["hash"],
height=tx["height"],
time=tx["createTime"],
publisher=tx["coinFroms"][0]["address"],
Expand Down
Loading