Skip to content

feat(dsm): context support for sqs -> lambda #13526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 29, 2025
35 changes: 22 additions & 13 deletions ddtrace/internal/datastreams/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,42 @@ def get_datastreams_context(message):
Formats we're aware of:
- message.Body.MessageAttributes._datadog.Value.decode() (SQS)
- message.MessageAttributes._datadog.StringValue (SNS -> SQS)
- message.MesssageAttributes._datadog.BinaryValue.decode() (SNS -> SQS, raw)
- message.MessageAttributes._datadog.BinaryValue.decode() (SNS -> SQS, raw)
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
"""
context_json = None
message_body = message
try:
message_body = json.loads(message.get("Body"))
except ValueError:
log.debug("Unable to parse message body, treat as non-json")

if "MessageAttributes" not in message_body:
body = message.get("Body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
log.debug("Unable to parse message body as JSON, treat as non-json")

message_attributes = message_body.get("MessageAttributes") or message_body.get("messageAttributes")
if not message_attributes:
log.debug("DataStreams skipped message: %r", message)
return None

if "_datadog" not in message_body["MessageAttributes"]:
if "_datadog" not in message_attributes:
log.debug("DataStreams skipped message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
# This is potentially a DSM SNS notification
if message_body["MessageAttributes"]["_datadog"]["Type"] == "Binary":
context_json = json.loads(base64.b64decode(message_body["MessageAttributes"]["_datadog"]["Value"]).decode())
elif "StringValue" in message["MessageAttributes"]["_datadog"]:
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "StringValue" in datadog_attr:
# The message originated from SQS
context_json = json.loads(message_body["MessageAttributes"]["_datadog"]["StringValue"])
elif "BinaryValue" in message["MessageAttributes"]["_datadog"]:
context_json = json.loads(datadog_attr["StringValue"])
elif "stringValue" in datadog_attr:
# The message originated from Lambda
context_json = json.loads(datadog_attr["stringValue"])
elif "BinaryValue" in datadog_attr:
# Raw message delivery
context_json = json.loads(message_body["MessageAttributes"]["_datadog"]["BinaryValue"].decode())
context_json = json.loads(datadog_attr["BinaryValue"].decode())
else:
log.debug("DataStreams did not handle message: %r", message)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
DSM: Add support for context extraction for SQS -> Lambda messsage events.
51 changes: 51 additions & 0 deletions tests/datastreams/test_botocore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json

from ddtrace.internal.datastreams.botocore import get_datastreams_context


class TestGetDatastreamsContext:
def test_sqs_to_lambda_format_with_datadog_context(self):
"""Test SQS -> Lambda format with _datadog messageAttributes."""
trace_context = {
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = get_datastreams_context(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123456789"
assert result["x-datadog-parent-id"] == "987654321"
assert result["x-datadog-sampling-priority"] == "1"
Loading