Skip to content

Commit

Permalink
Fix: drop invalid pending messages (#329)
Browse files Browse the repository at this point in the history
Problem: if an invalid message somehow managed to reach
the pending message collection, the message would be retried
indefinitely logging exceptions on each run.

Solution: drop invalid messages.
  • Loading branch information
odesenfans authored Oct 4, 2022
1 parent e8bc64e commit 2af2451
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from aleph.model.pending import PendingMessage
from aleph.services.p2p import singleton
from .job_utils import prepare_loop, process_job_results
from ..exceptions import InvalidMessageError
from ..schemas.pending_messages import parse_message

LOGGER = getLogger("jobs.pending_messages")
Expand Down Expand Up @@ -49,7 +50,15 @@ async def handle_pending_message(
seen_ids: Dict[Tuple, int],
) -> List[DbBulkOperation]:

message = parse_message(pending["message"])
delete_pending_message_op = DbBulkOperation(
PendingMessage, DeleteOne({"_id": pending["_id"]})
)

try:
message = parse_message(pending["message"])
except InvalidMessageError:
# If an invalid message somehow ended in pending messages, drop it.
return [delete_pending_message_op]

async with sem:
status, operations = await incoming(
Expand All @@ -64,9 +73,7 @@ async def handle_pending_message(
)

if status != IncomingStatus.RETRYING_LATER:
operations.append(
DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]}))
)
operations.append(delete_pending_message_op)

return operations

Expand Down

0 comments on commit 2af2451

Please sign in to comment.