Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/dispatch/plugins/bases/signal_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ class SignalConsumerPlugin(Plugin):

def consume(self, **kwargs):
raise NotImplementedError

def delete(self, **kwargs):
raise NotImplementedError
39 changes: 29 additions & 10 deletions src/dispatch/plugins/dispatch_aws/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""

import base64
import json
import logging
import zlib
from typing import TypedDict

import boto3
from pydantic import ValidationError
from psycopg2.errors import UniqueViolation
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

Expand All @@ -28,6 +30,13 @@
log = logging.getLogger(__name__)


def decompress_json(compressed_str: str) -> str:
"""Decompress a base64 encoded zlibed JSON string."""
decoded = base64.b64decode(compressed_str)
decompressed = zlib.decompress(decoded)
return decompressed.decode("utf-8")


class SqsEntries(TypedDict):
Id: str
ReceiptHandle: str
Expand All @@ -36,7 +45,7 @@ class SqsEntries(TypedDict):
class AWSSQSSignalConsumerPlugin(SignalConsumerPlugin):
title = "AWS SQS - Signal Consumer"
slug = "aws-sqs-signal-consumer"
description = "Uses sqs to consume signals"
description = "Uses SQS to consume signals."
version = __version__

author = "Netflix"
Expand All @@ -60,20 +69,28 @@ def consume(self, db_session: Session, project: Project) -> None:
WaitTimeSeconds=20,
)
if not response.get("Messages") or len(response["Messages"]) == 0:
log.info("No messages received from SQS")
log.info("No messages received from SQS.")
continue

entries: list[SqsEntries] = []
for message in response["Messages"]:
body = json.loads(message["Body"])
signal_data = json.loads(body["Message"])
message_attributes = message.get("MessageAttributes", {})
message_body = message["Body"]

if message_attributes.get("compressed", {}).get("StringValue") == "zlib":
# Message is compressed, decompress it
message_body = decompress_json(message_body)

message_body = json.loads(message_body)
signal_data = json.loads(message_body["Message"])

try:
signal_instance_in = SignalInstanceCreate(
project=project, raw=signal_data, **signal_data
)
except ValidationError as e:
log.warning(
f"Received signal instance that does not conform to `SignalInstanceCreate` structure, skipping creation: {e}"
f"Received a signal instance that does not conform to the `SignalInstanceCreate` structure. Skipping creation: {e}"
)
continue

Expand All @@ -83,7 +100,7 @@ def consume(self, db_session: Session, project: Project) -> None:
db_session=db_session, signal_instance_id=signal_instance_in.raw["id"]
):
log.info(
f"Received signal instance that already exists in the database, skipping creation: {signal_instance_in.raw['id']}"
f"Received a signal instance that already exists in the database. Skipping creation: {signal_instance_in.raw['id']}"
)
continue

Expand All @@ -96,10 +113,12 @@ def consume(self, db_session: Session, project: Project) -> None:
except IntegrityError as e:
if isinstance(e.orig, UniqueViolation):
log.info(
f"Received signal instance that already exists in the database, skipping creation: {e}"
f"Received a signal instance that already exists in the database. Skipping creation: {e}"
)
else:
log.exception(f"Integrity error when creating signal instance: {e}")
log.exception(
f"Encountered an Integrity error when trying to create a signal instance: {e}"
)
continue
except Exception as e:
log.exception(f"Unable to create signal instance: {e}")
Expand All @@ -114,7 +133,7 @@ def consume(self, db_session: Session, project: Project) -> None:
},
)
log.debug(
f"Received signal: name: {signal_instance.signal.name} id: {signal_instance.signal.id}"
f"Received a signal with name {signal_instance.signal.name} and id {signal_instance.signal.id}"
)
entries.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
Expand Down
Loading