Skip to content

feat: Enable sqs -> lambda support for DSM #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add context prop test
  • Loading branch information
michael-zhao459 committed Jun 4, 2025
commit c12e2dc9612bafe3b23b0c228e9ece151c9ddef4
165 changes: 165 additions & 0 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,171 @@ def return_type_test(event, context):
self.assertEqual(result, test_result)
self.assertFalse(MockPrintExc.called)

@patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"})
def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can vastly simplify these tests. This might warrant another pairing session, but I'll let you take a stab at it on your own first. Feel free to schedule something with me if you wanna go over it together.

There are two different files which you changed, and therefore two different test files that will need updating: tests/test_wrapper.py and a new file tests/test_dsm.py.

test_wrapper.py

For the test_wrapper.py file, we simply need to test that, based on the env vars DD_DATA_STREAMS_ENABLED and DD_TRACE_ENABLED we either do or do not call set_dsm_context with the proper args. We'll push all of the verification of what happens inside of set_dsm_context to the test_dsm.py file.

I'm a bit fan of pytest, which isn't yet imported and used in this file, which allows you to reuse the same code over and over again to create "parametrized" tests. You can accomplish this same thing using unittest (as this file already uses), though it would mean creating 4 different test methods.

# test_wrapper.py

import pytest

_test_set_dsm_context = (
		("true", "true", True),
		("true", "false", False),
		("false", "true", False),
		("false", "false", False),
)

@pytest.mark.parametrize("trace_enabled,dsm_enabled,should_call", _test_set_dsm_context)
def test_set_dsm_context(trace_enabled, dsm_enabled, should_call, monkeypatch):

	# use monkeypatch to set env vars DD_TRACE_ENABLED and DD_DATA_STREAMS_ENABLED
	
	# use monkeypatch to create a mock for `set_dsm_context`, you can also use mock.patch

	@wrapper.datadog_lambda_wrapper
	def lambda_handler(event, context):
		return "ok"

	result = lambda_handler(sqs_event, get_mock_context())
	assert result == "ok"

	if should_call: 
		# not sure of the api here, so this is just made up
		assert set_dsm_context_patch.called_with == (sqs_event, EventSource(EventSourceType.SQS))
	else:
		# again, not sure about api
		assert set_dsm_context.not_called

test_dsm.py

In the test_dsm.py file, this is where you'll assert to make sure that the set_dsm_context works as expected. You'll want to include several tests:

  1. Sending an event source of anything other than SQS will do nothing
  2. Sending an event with no Records will do nothing
  3. For each Record in the event, dsm does the setting of context as expected

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lemme take a stab at these today, I will definitely schedule a meeting if I get stuck on any of these parts

with patch(
"ddtrace.internal.datastreams.processor.get_connection"
) as mock_get_connection:

mock_conn = unittest.mock.MagicMock()
mock_response = unittest.mock.MagicMock()
mock_response.status = 200
mock_conn.getresponse.return_value = mock_response
mock_get_connection.return_value = mock_conn

def updated_get_datastreams_context(message):
"""
Updated version that handles the correct message formats
"""
import base64
import json

context_json = None
message_body = message
try:
body = message.get("Body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
pass

message_attributes = message_body.get(
"MessageAttributes"
) or message_body.get("messageAttributes")
if not message_attributes:
return None

if "_datadog" not in message_attributes:
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(
base64.b64decode(datadog_attr["Value"]).decode()
)
elif "StringValue" in datadog_attr:
context_json = json.loads(datadog_attr["StringValue"])
elif "stringValue" in datadog_attr:
context_json = json.loads(datadog_attr["stringValue"])
elif "BinaryValue" in datadog_attr:
context_json = json.loads(datadog_attr["BinaryValue"].decode())
else:
print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}")

return context_json

with patch(
"ddtrace.internal.datastreams.botocore.get_datastreams_context",
updated_get_datastreams_context,
):

# Step 1: Create a message with some context in the message attributes

from ddtrace.internal.datastreams.processor import DataStreamsProcessor

processor_instance = DataStreamsProcessor()

with patch(
"ddtrace.internal.datastreams.processor.DataStreamsProcessor",
return_value=processor_instance,
):

parent_ctx = processor_instance.new_pathway()

parent_ctx.set_checkpoint(
["direction:out", "topic:upstream-topic", "type:sqs"],
now_sec=1640995200.0,
payload_size=512,
)
parent_hash = parent_ctx.hash
encoded_parent_context = parent_ctx.encode_b64()

sqs_event = {
"Records": [
{
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test",
"Body": "test message body",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{
"dd-pathway-ctx-base64": encoded_parent_context
}
)
}
},
}
]
}

# Step 2: Call the handler
@wrapper.datadog_lambda_wrapper
def lambda_handler(event, context):
return {"statusCode": 200, "body": "processed"}

result = lambda_handler(sqs_event, get_mock_context())
self.assertEqual(result["statusCode"], 200)

# New context set after handler call
current_ctx = processor_instance._current_context.value
self.assertIsNotNone(
current_ctx,
"Data streams context should be set after processing SQS message",
)

# Step 3: Check that hash in this context is the child of the hash you passed
# Step 4: Check that the right checkpoint was produced during call to handler

found_sqs_checkpoint = False
for bucket_time, bucket in processor_instance._buckets.items():
for aggr_key, stats in bucket.pathway_stats.items():
edge_tags_str, hash_value, parent_hash_recorded = aggr_key
edge_tags = edge_tags_str.split(",")

if (
"direction:in" in edge_tags
and "topic:test" in edge_tags
and "type:sqs" in edge_tags
):
found_sqs_checkpoint = True

# EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
self.assertEqual(
parent_hash_recorded,
parent_hash,
f"Parent hash must be preserved: "
f"expected {parent_hash}, got {parent_hash_recorded}",
)
self.assertEqual(
hash_value,
current_ctx.hash,
f"Child hash must match current context: "
f"expected {current_ctx.hash}, got {hash_value}",
)
self.assertNotEqual(
hash_value,
parent_hash_recorded,
f"Child hash ({hash_value}) must be different from "
f"parent hash ({parent_hash_recorded}) - proves parent-child",
)
self.assertGreaterEqual(
stats.payload_size.count,
1,
"Should have one payload size measurement",
)

break

self.assertTrue(
found_sqs_checkpoint,
"Should have found SQS consumption checkpoint in processor stats",
)

processor_instance.shutdown(timeout=0.1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look great! Super easy to read and follow.


class TestLambdaDecoratorSettings(unittest.TestCase):
def test_some_envs_should_depend_on_dd_tracing_enabled(self):
Expand Down