Skip to content

Commit

Permalink
[Messages] Reception and confirmation times
Browse files Browse the repository at this point in the history
Added two new fields to the messages collection. We now store
the initial time of reception of a message in the `reception_time`
field, and the earliest on-chain confirmation time in
the `confirmation_time` field.

These fields will be used to order messages more precisely/securely
and to compute metrics, for example the propagation time of a message
on the network.

The `time` field of messages is now deprecated, as it is defined
by users and not signed. If you need the user time, use `content.time`.
  • Loading branch information
odesenfans committed Oct 13, 2022
1 parent f4446ff commit d70793f
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This migration adds the `confirmation_time` and `reception_time` fields.
`confirmation_time` serves as a cache of the first confirmation message seen
in on-chain data.
`reception_time` represents the first time the node became aware of
the message, confirmed or not.
"""


import logging
import os

from configmanager import Config

from aleph.model.messages import Message

logger = logging.getLogger(os.path.basename(__file__))


async def upgrade(config: Config, **kwargs):
logger.info("Creating confirmation_time field for messages...")
await Message.collection.update_many(
{"confirmed": True},
[{"$set": {"confirmation_time": {"$min": "$confirmations.time"}}}],
)


async def downgrade(**kwargs):
logger.info("Creating confirmation_time field for messages...")
await Message.collection.update_many(
{"$unset": {"confirmation_time": 1, "reception_time": 1}}
)
16 changes: 14 additions & 2 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from enum import IntEnum
from typing import Dict, Optional, Tuple, List
from typing import Any, Dict, Optional, Tuple, List

from bson import ObjectId
from pymongo import UpdateOne
Expand Down Expand Up @@ -61,6 +61,7 @@ async def mark_confirmed_data(chain_name, tx_hash, height):

async def delayed_incoming(
message: BasePendingMessage,
reception_time: float,
tx_context: Optional[TxContext] = None,
check_message: bool = True,
):
Expand All @@ -71,6 +72,7 @@ async def delayed_incoming(
{
"message": message.dict(exclude={"content"}),
"tx_context": tx_context.dict() if tx_context else None,
"reception_time": reception_time,
"check_message": check_message,
}
)
Expand All @@ -84,6 +86,7 @@ class IncomingStatus(IntEnum):

async def mark_message_for_retry(
message: BasePendingMessage,
reception_time: float,
tx_context: Optional[TxContext],
check_message: bool,
retrying: bool,
Expand All @@ -92,7 +95,12 @@ async def mark_message_for_retry(
message_dict = message.dict(exclude={"content"})

if not retrying:
await delayed_incoming(message, tx_context, check_message)
await delayed_incoming(
message,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
)
else:
LOGGER.debug(f"Incrementing for {existing_id}")
result = await PendingMessage.collection.update_one(
Expand All @@ -103,6 +111,7 @@ async def mark_message_for_retry(

async def incoming(
pending_message: BasePendingMessage,
reception_time: float,
tx_context: Optional[TxContext] = None,
seen_ids: Optional[Dict[Tuple, int]] = None,
check_message: bool = False,
Expand Down Expand Up @@ -191,6 +200,7 @@ async def incoming(
LOGGER.exception("Can't get content of object %r" % item_hash)
await mark_message_for_retry(
message=pending_message,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
Expand All @@ -201,6 +211,7 @@ async def incoming(
validated_message = validate_pending_message(
pending_message=pending_message,
content=content,
reception_time=reception_time,
confirmations=confirmations,
)

Expand Down Expand Up @@ -230,6 +241,7 @@ async def incoming(
LOGGER.debug("Message type handler has failed, retrying later.")
await mark_message_for_retry(
message=pending_message,
reception_time=reception_time,
tx_context=tx_context,
check_message=check_message,
retrying=retrying,
Expand Down
1 change: 1 addition & 0 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def handle_pending_message(
async with sem:
status, operations = await incoming(
pending_message=message,
reception_time=pending["reception_time"],
tx_context=tx_context,
seen_ids=seen_ids,
check_message=pending["source"].get("check_message", True),
Expand Down
7 changes: 7 additions & 0 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import logging
import time
from typing import List, Dict, Optional
from typing import Set

Expand Down Expand Up @@ -39,6 +40,11 @@ async def handle_pending_tx(
)
if messages:
for i, message_dict in enumerate(messages):
reception_time = time.time()
# TODO: this update of the time field is unwanted, but needed to preserve
# the behavior of aggregates. Use the correct time field in aggregates
# and then remove this line.
message_dict["time"] = tx_context.time + (i / 1000)

try:
# we don't check signatures yet.
Expand All @@ -57,6 +63,7 @@ async def handle_pending_tx(
{
"message": message.dict(exclude={"content"}),
"tx_context": tx_context.dict(),
"reception_time": reception_time,
"check_message": True,
}
),
Expand Down
30 changes: 27 additions & 3 deletions src/aleph/schemas/validated_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
MessageType,
PostContent,
ProgramContent,
StoreContent, )
StoreContent,
)
from pydantic import BaseModel, Field

from aleph.schemas.base_messages import AlephBaseMessage, ContentType, MType
Expand Down Expand Up @@ -60,6 +61,8 @@ class BaseValidatedMessage(AlephBaseMessage, Generic[MType, ContentType]):
content: ContentType
confirmations: List[MessageConfirmation] = Field(default_factory=list)
forgotten_by: List[str] = Field(default_factory=list)
reception_time: float
confirmation_time: Optional[float]


class ValidatedAggregateMessage(
Expand Down Expand Up @@ -95,6 +98,7 @@ class ValidatedStoreMessage(
def validate_pending_message(
pending_message: BasePendingMessage[MType, ContentType],
content: MessageContent,
reception_time: float,
confirmations: List[MessageConfirmation],
) -> BaseValidatedMessage[MType, ContentType]:

Expand All @@ -114,6 +118,11 @@ def validate_pending_message(
if json_content.get("time", None) is None:
json_content["time"] = pending_message.time

if confirmations:
confirmation_time = min(confirmation.time for confirmation in confirmations)
else:
confirmation_time = None

# Note: we could use the construct method of Pydantic to bypass validation
# and speed up the conversion process. However, this means giving up on validation.
# At the time of writing, correctness seems more important than performance.
Expand All @@ -123,6 +132,8 @@ def validate_pending_message(
confirmed=bool(confirmations),
confirmations=confirmations,
size=len(content.raw_value),
reception_time=reception_time,
confirmation_time=confirmation_time,
)


Expand All @@ -137,6 +148,11 @@ def make_confirmation_update_query(confirmations: List[MessageConfirmation]) ->

return {
"$max": {"confirmed": True},
"$min": {
"confirmation_time": min(
confirmation.time for confirmation in confirmations
)
},
"$addToSet": {
"confirmations": {
"$each": [confirmation.dict() for confirmation in confirmations]
Expand All @@ -159,10 +175,18 @@ def make_message_upsert_query(message: BaseValidatedMessage[Any, Any]) -> Dict:
"channel": message.channel,
"signature": message.signature,
},
"$min": {"time": message.time},
"$min": {
"time": message.time,
"reception_time": message.reception_time,
},
}

# Add fields related to confirmations
updates.update(make_confirmation_update_query(message.confirmations))
confirmation_updates = make_confirmation_update_query(message.confirmations)
for k, v in confirmation_updates.items():
try:
updates[k].update(v)
except KeyError:
updates[k] = v

return updates
8 changes: 6 additions & 2 deletions src/aleph/services/ipfs/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import base64
import logging
import time
from typing import Union

import base58

from ...exceptions import InvalidMessageError
from .common import get_ipfs_api
from aleph.exceptions import InvalidMessageError

LOGGER = logging.getLogger("IPFS.PUBSUB")

Expand Down Expand Up @@ -48,9 +49,12 @@ async def incoming_channel(topic) -> None:
try:
async for mvalue in sub(topic):
try:
reception_time = time.time()
message = await get_pubsub_message(mvalue)
LOGGER.debug("New message %r" % message)
asyncio.create_task(process_one_message(message))
asyncio.create_task(
process_one_message(message, reception_time=reception_time)
)
except InvalidMessageError:
LOGGER.warning(f"Invalid message {mvalue}")
except Exception:
Expand Down
4 changes: 3 additions & 1 deletion src/aleph/services/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import random
import time
from typing import Any, Dict, Optional, Set

from anyio.abc import SocketStream
Expand Down Expand Up @@ -186,6 +187,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None:
try:
async for pubsub_message in receive_pubsub_messages(stream):
try:
reception_time = time.time()
msg_dict = pubsub_msg_to_dict(pubsub_message)
LOGGER.debug("Received from P2P:", msg_dict)
# we should check the sender here to avoid spam
Expand All @@ -196,7 +198,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None:
continue

LOGGER.debug("New message %r" % message)
await delayed_incoming(message)
await delayed_incoming(message, reception_time)
except Exception:
LOGGER.exception("Can't handle message")

Expand Down
3 changes: 2 additions & 1 deletion tests/chains/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,6 @@ async def async_magic():
message_dict["item_type"] = "inline"

message = parse_message(message_dict)
status, ops = await incoming(message, check_message=True)
status, ops = await incoming(message, reception_time=100000, check_message=True)

assert status == IncomingStatus.MESSAGE_HANDLED
17 changes: 14 additions & 3 deletions tests/chains/test_confirmation.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ async def test_confirm_message(test_db):
content = json.loads(MESSAGE_DICT["item_content"])

message = parse_message(MESSAGE_DICT)
await process_one_message(message)
original_reception_time = 100000
await process_one_message(message, reception_time=original_reception_time)
message_in_db = await Message.collection.find_one({"item_hash": item_hash})

assert message_in_db is not None
assert message_in_db["content"] == content
assert not message_in_db["confirmed"]
assert message_in_db["reception_time"] == original_reception_time

capped_message_in_db = await CappedMessage.collection.find_one(
{"item_hash": item_hash}
Expand All @@ -57,6 +59,7 @@ async def test_confirm_message(test_db):
assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db)

# Now, confirm the message
confirmation_reception_time = 123000
tx_context = TxContext(
chain="ETH",
hash="123",
Expand All @@ -65,12 +68,15 @@ async def test_confirm_message(test_db):
publisher="0xdeadbeef",
)

await process_one_message(message, tx_context)
await process_one_message(message, reception_time=confirmation_reception_time, tx_context=tx_context)

message_in_db = await Message.collection.find_one({"item_hash": item_hash})

assert message_in_db is not None
assert message_in_db["confirmed"]
assert message_in_db["confirmation_time"] == tx_context.time
assert message_in_db["reception_time"] == original_reception_time

expected_confirmations = [tx_context.dict()]
assert message_in_db["confirmations"] == expected_confirmations

Expand All @@ -92,6 +98,7 @@ async def test_process_confirmed_message(test_db):
"""

item_hash = MESSAGE_DICT["item_hash"]
reception_time = 1000000

# Confirm the message
message = parse_message(MESSAGE_DICT)
Expand All @@ -102,13 +109,17 @@ async def test_process_confirmed_message(test_db):
time=120000,
publisher="0xdeadbeef",
)
await process_one_message(message, tx_context)
await process_one_message(
message, reception_time=reception_time, tx_context=tx_context
)

# Now, confirm the message
message_in_db = await Message.collection.find_one({"item_hash": item_hash})

assert message_in_db is not None
assert message_in_db["confirmed"]
assert message_in_db["confirmation_time"] == tx_context.time
assert message_in_db["reception_time"] == reception_time

expected_confirmations = [tx_context.dict()]
assert message_in_db["confirmations"] == expected_confirmations
Expand Down
4 changes: 4 additions & 0 deletions tests/helpers/message_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def make_validated_message_from_dict(
message_dict: Dict,
raw_content: Optional[Union[str, bytes]] = None,
confirmations: Optional[List[MessageConfirmation]] = None,
reception_time: Optional[float] = None,
):
"""
Creates a validated message instance from a raw message dictionary.
Expand Down Expand Up @@ -46,4 +47,7 @@ def make_validated_message_from_dict(
pending_message=pending_message,
content=message_content,
confirmations=confirmations or [],
# If no reception time is specified, just set it to the message time as propagation
# across Aleph is instantaneous, obviously.
reception_time=reception_time or pending_message.time,
)
3 changes: 3 additions & 0 deletions tests/schemas/test_validated_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def test_parse_aggregate_inline_message():
pending_message=pending_message,
content=message_content,
confirmations=confirmations,
reception_time=pending_message.time,
)
assert isinstance(validated_message, ValidatedAggregateMessage)

Expand Down Expand Up @@ -105,6 +106,7 @@ def test_parse_post_message_storage_content():
pending_message=pending_message,
content=message_content,
confirmations=confirmations,
reception_time=pending_message.time,
)

check_basic_message_fields(validated_message, message_dict)
Expand Down Expand Up @@ -139,6 +141,7 @@ def test_parse_store_message_inline_content():
pending_message=pending_message,
content=message_content,
confirmations=confirmations,
reception_time=pending_message.time,
)
assert isinstance(validated_message, ValidatedStoreMessage)

Expand Down
Loading

0 comments on commit d70793f

Please sign in to comment.