Skip to content

[Messages] Preserve original message time #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions deployment/migrations/scripts/0003-fix-message-time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
This migration fixes the "time" field of messages. It was incorrectly updated when fetching
messages from the on-chain storage.

We now store the original message time in the "time" field, and we store the confirmation time
in the confirmations array. To achieve this, we must fetch the whole message history and
process it again.
"""

import logging

from configmanager import Config

from aleph.model.chains import Chain
from aleph.model.messages import Message

logger = logging.getLogger()


async def must_run_migration() -> bool:
nb_documents = Message.collection.count_documents(
filter={"content.time": {"$exists": 1}, "$expr": {"$ne": ["$time", "$content.time"]}}
)
return bool(nb_documents)


async def upgrade(config: Config, **kwargs):
if await must_run_migration():
logger.info("Messages with inconsistent times found, running migration.")
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)
else:
logger.info("Message times already set to the correct value, skipping migration.")

logger.info("Some queries may take a while to execute.")

# First, update all the messages that have a valid content.time field.
# This represents 99.99% of the messages in the DB, the only exception
# being forgotten messages.
filter = {"content.time": {"$exists": 1}}
update = [{"$set": {"time": "$content.time"}}]

logger.info("Resetting the time field on messages. This operation may take a while.")
await Message.collection.update_many(filter=filter, update=update)
logger.info("Reset message times to their original value.")


def downgrade(**kwargs):
# Nothing to do, we do not wish to revert this migration.
pass
Empty file.
7 changes: 7 additions & 0 deletions deployment/migrations/utils/reset_chain_height.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from configmanager import Config
from aleph.model.chains import Chain


async def reset_chain_height(config: Config):
start_height = config.ethereum.start_height.value
await Chain.set_last_height("ETH", start_height)
2 changes: 0 additions & 2 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ async def handle_pending_tx(
)
if messages:
for i, message in enumerate(messages):
message["time"] = tx_context.time + (i / 1000) # force order

try:
message = await check_message(
message, trusted=True
Expand Down