Skip to content

refactor: remove tracer dependencies to support dsm sqs -> lambda #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,61 @@
from datadog_lambda import logger
import logging
import json
from datadog_lambda.trigger import EventTypes

logger = logging.getLogger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)


def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)
from ddtrace.data_streams import set_consume_checkpoint

records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))
arn = record.get("eventSourceARN", "")
context_json = _get_dsm_context_from_lambda(record)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we remove the try / except here. Is there a reason for that (maybe there is and I don't see it).
But we want to make sure our instrumentation never prevents the lambda from being executed, even if there is an issue with the instrumentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should move this inside the try/except.

if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return is enough

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint("sqs", arn, carrier_get)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Let's make sure we update the set manual_checkpoint=False here once the other PR is merged & the tracer released.

Copy link
Collaborator Author

@michael-zhao459 michael-zhao459 Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan on keeping this PR in draft until tracer released, and will make sure to update


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're logging debug messages multiple times for the same record.


if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if "stringValue" in datadog_attr:
# SQS -> lambda
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a type check here to ensure this is a dict.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also make sure datadog_attr is a dict.

context_json = json.loads(datadog_attr["stringValue"])
Copy link
Contributor

@purple4reina purple4reina Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the event_type to avoid doing unnecessary work. We should already mostly know the shape of the event. Without doing so, this method is gonna get insanely large.

I would recommend creating a separate _get_dsm_context for each event type.

else:
logger.debug("DataStreams did not handle lambda message: %r", message)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making each of these log lines slightly different. That way when one is encountered, it is easy to find the exact line of code where it was produced. Otherwise, we don't know what the actual issue was.

return context_json


def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)

return carrier_get
171 changes: 139 additions & 32 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,35 @@
import unittest
from unittest.mock import patch, MagicMock

from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
import json
from unittest.mock import patch

from datadog_lambda.dsm import (
set_dsm_context,
_dsm_set_sqs_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource


class TestDsmSQSContext(unittest.TestCase):
class TestSetDSMContext(unittest.TestCase):
def setUp(self):
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
self.mock_dsm_set_sqs_context = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
self.mock_data_streams_processor = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context")
self.mock_get_datastreams_context = patcher.start()
self.mock_get_datastreams_context.return_value = {}
self.addCleanup(patcher.stop)

patcher = patch(
"ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size"
)
self.mock_calculate_sqs_payload_size = patcher.start()
self.mock_calculate_sqs_payload_size.return_value = 100
patcher = patch("ddtrace.data_streams.set_consume_checkpoint")
self.mock_set_consume_checkpoint = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode")
self.mock_dsm_pathway_codec_decode = patcher.start()
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda")
self.mock_get_dsm_context_from_lambda = patcher.start()
self.addCleanup(patcher.stop)

def test_non_sqs_event_source_does_nothing(self):
"""Test that non-SQS event sources don't trigger DSM context setting"""
event = {}
# Use Unknown Event Source
event_source = _EventSource(EventTypes.UNKNOWN)
set_dsm_context(event, event_source)

# DSM context should not be set for non-SQS events
self.mock_dsm_set_sqs_context.assert_not_called()

def test_sqs_event_with_no_records_does_nothing(self):
Expand All @@ -51,7 +42,7 @@ def test_sqs_event_with_no_records_does_nothing(self):

for event in events_with_no_records:
_dsm_set_sqs_context(event)
self.mock_data_streams_processor.assert_not_called()
self.mock_set_consume_checkpoint.assert_not_called()

def test_sqs_event_triggers_dsm_sqs_context(self):
"""Test that SQS event sources trigger the SQS-specific DSM context function"""
Expand All @@ -77,36 +68,152 @@ def test_sqs_multiple_records_process_each_record(self):
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1",
"body": "Message 1",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{"dd-pathway-ctx-base64": "context1"}
),
"dataType": "String",
}
},
},
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2",
"body": "Message 2",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{"dd-pathway-ctx-base64": "context2"}
),
"dataType": "String",
}
},
},
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3",
"body": "Message 3",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add another test where the stringValue isn't a dict.

{"dd-pathway-ctx-base64": "context3"}
),
"dataType": "String",
}
},
},
]
}

mock_context = MagicMock()
self.mock_dsm_pathway_codec_decode.return_value = mock_context
self.mock_get_dsm_context_from_lambda.side_effect = [
{"dd-pathway-ctx-base64": "context1"},
{"dd-pathway-ctx-base64": "context2"},
{"dd-pathway-ctx-base64": "context3"},
]

_dsm_set_sqs_context(multi_record_event)

self.assertEqual(mock_context.set_checkpoint.call_count, 3)
self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)

calls = mock_context.set_checkpoint.call_args_list
calls = self.mock_set_consume_checkpoint.call_args_list
expected_arns = [
"arn:aws:sqs:us-east-1:123456789012:queue1",
"arn:aws:sqs:us-east-1:123456789012:queue2",
"arn:aws:sqs:us-east-1:123456789012:queue3",
]
expected_contexts = ["context1", "context2", "context3"]

for i, call in enumerate(calls):
args, kwargs = call
tags = args[0]
self.assertIn("direction:in", tags)
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:sqs", tags)
self.assertEqual(kwargs["payload_size"], 100)
service_type = args[0]
arn = args[1]
carrier_get_func = args[2]

self.assertEqual(service_type, "sqs")

self.assertEqual(arn, expected_arns[i])

pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
self.assertEqual(pathway_ctx, expected_contexts[i])


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message without attributes",
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
},
}

result = _get_dsm_context_from_lambda(message)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] newline here

assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"messageAttributes": {"_datadog": {}},
}

result = _get_dsm_context_from_lambda(message)

assert result is None
Loading