From 2af24515d07940c2c495792412d36703c8f72620 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 4 Oct 2022 02:48:11 +0200 Subject: [PATCH] Fix: drop invalid pending messages (#329) Problem: if an invalid message somehow managed to reach the pending message collection, the message would be retried indefinitely logging exceptions on each run. Solution: drop invalid messages. --- src/aleph/jobs/process_pending_messages.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cb5aa69fd..dd35e445d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..exceptions import InvalidMessageError from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -49,7 +50,15 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: - message = parse_message(pending["message"]) + delete_pending_message_op = DbBulkOperation( + PendingMessage, DeleteOne({"_id": pending["_id"]}) + ) + + try: + message = parse_message(pending["message"]) + except InvalidMessageError: + # If an invalid message somehow ended in pending messages, drop it. + return [delete_pending_message_op] async with sem: status, operations = await incoming( @@ -64,9 +73,7 @@ async def handle_pending_message( ) if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) + operations.append(delete_pending_message_op) return operations