Skip to content

Commit

Permalink
[Messages] Preserve original message time
Browse files Browse the repository at this point in the history
Fixed an issue where the CCN would overwrite the original "time"
field of the message by the time at which the message is confirmed
on-chain. This issue could result in inversions of order between
messages if two consecutive messages arrive in the opposite order
on the node in charge of confirming messages.

Added a migration script to reset the message time field using
the content.time field. The migration script does not update
forgotten messages as their content is null. The extra effort
to update these messages is not worth it given that it requires
to parse the whole chain data history for a few tens of messages.
  • Loading branch information
odesenfans committed May 10, 2022
1 parent 1c379e3 commit a450a14
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
50 changes: 50 additions & 0 deletions deployment/migrations/scripts/0003-fix-message-time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
This migration fixes the "time" field of messages. It was incorrectly updated when fetching
messages from the on-chain storage.
We now store the original message time in the "time" field, and we store the confirmation time
in the confirmations array. To achieve this, we must fetch the whole message history and
process it again.
"""

import logging

from configmanager import Config

from aleph.model.chains import Chain
from aleph.model.messages import Message

logger = logging.getLogger()


async def must_run_migration() -> bool:
nb_documents = Message.collection.count_documents(
filter={"content.time": {"$exists": 1}, "$expr": {"$ne": ["$time", "$content.time"]}}
)
return bool(nb_documents)


async def upgrade(config: Config, **kwargs):
if await must_run_migration():
logger.info("Messages with inconsistent times found, running migration.")
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)
else:
logger.info("Message times already set to the correct value, skipping migration.")

logger.info("Some queries may take a while to execute.")

# First, update all the messages that have a valid content.time field.
# This represents 99.99% of the messages in the DB, the only exception
# being forgotten messages.
filter = {"content.time": {"$exists": 1}}
update = [{"$set": {"time": "$content.time"}}]

logger.info("Resetting the time field on messages. This operation may take a while.")
await Message.collection.update_many(filter=filter, update=update)
logger.info("Reset message times to their original value.")


def downgrade(**kwargs):
# Nothing to do, we do not wish to revert this migration.
pass
Empty file.
7 changes: 7 additions & 0 deletions deployment/migrations/utils/reset_chain_height.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from configmanager import Config
from aleph.model.chains import Chain


async def reset_chain_height(config: Config):
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)
2 changes: 0 additions & 2 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ async def handle_pending_tx(
)
if messages:
for i, message in enumerate(messages):
message["time"] = tx_context.time + (i / 1000) # force order

try:
message = await check_message(
message, trusted=True
Expand Down

0 comments on commit a450a14

Please sign in to comment.