Skip to content

Commit 6beb65d

Browse files
feat: Enable sqs -> lambda support for DSM (#604)
--------- Co-authored-by: Rey Abolofia <purple4reina@gmail.com>
1 parent 254466c commit 6beb65d

File tree

4 files changed

+217
-0
lines changed

4 files changed

+217
-0
lines changed

datadog_lambda/dsm.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from datadog_lambda import logger
2+
from datadog_lambda.trigger import EventTypes
3+
4+
5+
def set_dsm_context(event, event_source):
6+
7+
if event_source.equals(EventTypes.SQS):
8+
_dsm_set_sqs_context(event)
9+
10+
11+
def _dsm_set_sqs_context(event):
12+
from datadog_lambda.wrapper import format_err_with_traceback
13+
from ddtrace.internal.datastreams import data_streams_processor
14+
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
15+
from ddtrace.internal.datastreams.botocore import (
16+
get_datastreams_context,
17+
calculate_sqs_payload_size,
18+
)
19+
20+
records = event.get("Records")
21+
if records is None:
22+
return
23+
processor = data_streams_processor()
24+
25+
for record in records:
26+
try:
27+
queue_arn = record.get("eventSourceARN", "")
28+
29+
contextjson = get_datastreams_context(record)
30+
payload_size = calculate_sqs_payload_size(record)
31+
32+
ctx = DsmPathwayCodec.decode(contextjson, processor)
33+
ctx.set_checkpoint(
34+
["direction:in", f"topic:{queue_arn}", "type:sqs"],
35+
payload_size=payload_size,
36+
)
37+
except Exception as e:
38+
logger.error(format_err_with_traceback(e))

datadog_lambda/wrapper.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from importlib import import_module
1010
from time import time_ns
1111

12+
from datadog_lambda.dsm import set_dsm_context
1213
from datadog_lambda.extension import should_use_extension, flush_extension
1314
from datadog_lambda.cold_start import (
1415
set_cold_start,
@@ -79,6 +80,7 @@
7980
DD_REQUESTS_SERVICE_NAME = "DD_REQUESTS_SERVICE_NAME"
8081
DD_SERVICE = "DD_SERVICE"
8182
DD_ENV = "DD_ENV"
83+
DD_DATA_STREAMS_ENABLED = "DD_DATA_STREAMS_ENABLED"
8284

8385

8486
def get_env_as_int(env_key, default_value: int) -> int:
@@ -190,6 +192,9 @@ def __init__(self, func):
190192
self.min_cold_start_trace_duration = get_env_as_int(
191193
DD_MIN_COLD_START_DURATION, 3
192194
)
195+
self.data_streams_enabled = (
196+
os.environ.get(DD_DATA_STREAMS_ENABLED, "false").lower() == "true"
197+
)
193198
self.local_testing_mode = os.environ.get(
194199
DD_LOCAL_TEST, "false"
195200
).lower() in ("true", "1")
@@ -322,6 +327,8 @@ def _before(self, event, context):
322327
self.inferred_span = create_inferred_span(
323328
event, context, event_source, self.decode_authorizer_context
324329
)
330+
if self.data_streams_enabled:
331+
set_dsm_context(event, event_source)
325332
self.span = create_function_execution_span(
326333
context=context,
327334
function_name=self.function_name,

tests/test_dsm.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import unittest
2+
from unittest.mock import patch, MagicMock
3+
4+
from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
5+
from datadog_lambda.trigger import EventTypes, _EventSource
6+
7+
8+
class TestDsmSQSContext(unittest.TestCase):
9+
def setUp(self):
10+
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
11+
self.mock_dsm_set_sqs_context = patcher.start()
12+
self.addCleanup(patcher.stop)
13+
14+
patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
15+
self.mock_data_streams_processor = patcher.start()
16+
self.addCleanup(patcher.stop)
17+
18+
patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context")
19+
self.mock_get_datastreams_context = patcher.start()
20+
self.mock_get_datastreams_context.return_value = {}
21+
self.addCleanup(patcher.stop)
22+
23+
patcher = patch(
24+
"ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size"
25+
)
26+
self.mock_calculate_sqs_payload_size = patcher.start()
27+
self.mock_calculate_sqs_payload_size.return_value = 100
28+
self.addCleanup(patcher.stop)
29+
30+
patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode")
31+
self.mock_dsm_pathway_codec_decode = patcher.start()
32+
self.addCleanup(patcher.stop)
33+
34+
def test_non_sqs_event_source_does_nothing(self):
35+
"""Test that non-SQS event sources don't trigger DSM context setting"""
36+
event = {}
37+
# Use Unknown Event Source
38+
event_source = _EventSource(EventTypes.UNKNOWN)
39+
set_dsm_context(event, event_source)
40+
41+
# DSM context should not be set for non-SQS events
42+
self.mock_dsm_set_sqs_context.assert_not_called()
43+
44+
def test_sqs_event_with_no_records_does_nothing(self):
45+
"""Test that events where Records is None don't trigger DSM processing"""
46+
events_with_no_records = [
47+
{},
48+
{"Records": None},
49+
{"someOtherField": "value"},
50+
]
51+
52+
for event in events_with_no_records:
53+
_dsm_set_sqs_context(event)
54+
self.mock_data_streams_processor.assert_not_called()
55+
56+
def test_sqs_event_triggers_dsm_sqs_context(self):
57+
"""Test that SQS event sources trigger the SQS-specific DSM context function"""
58+
sqs_event = {
59+
"Records": [
60+
{
61+
"eventSource": "aws:sqs",
62+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
63+
"body": "Hello from SQS!",
64+
}
65+
]
66+
}
67+
68+
event_source = _EventSource(EventTypes.SQS)
69+
set_dsm_context(sqs_event, event_source)
70+
71+
self.mock_dsm_set_sqs_context.assert_called_once_with(sqs_event)
72+
73+
def test_sqs_multiple_records_process_each_record(self):
74+
"""Test that each record in an SQS event gets processed individually"""
75+
multi_record_event = {
76+
"Records": [
77+
{
78+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1",
79+
"body": "Message 1",
80+
},
81+
{
82+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2",
83+
"body": "Message 2",
84+
},
85+
{
86+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3",
87+
"body": "Message 3",
88+
},
89+
]
90+
}
91+
92+
mock_context = MagicMock()
93+
self.mock_dsm_pathway_codec_decode.return_value = mock_context
94+
95+
_dsm_set_sqs_context(multi_record_event)
96+
97+
self.assertEqual(mock_context.set_checkpoint.call_count, 3)
98+
99+
calls = mock_context.set_checkpoint.call_args_list
100+
expected_arns = [
101+
"arn:aws:sqs:us-east-1:123456789012:queue1",
102+
"arn:aws:sqs:us-east-1:123456789012:queue2",
103+
"arn:aws:sqs:us-east-1:123456789012:queue3",
104+
]
105+
106+
for i, call in enumerate(calls):
107+
args, kwargs = call
108+
tags = args[0]
109+
self.assertIn("direction:in", tags)
110+
self.assertIn(f"topic:{expected_arns[i]}", tags)
111+
self.assertIn("type:sqs", tags)
112+
self.assertEqual(kwargs["payload_size"], 100)

tests/test_wrapper.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ def setUp(self):
7676
self.mock_dd_lambda_layer_tag = patcher.start()
7777
self.addCleanup(patcher.stop)
7878

79+
patcher = patch("datadog_lambda.wrapper.set_dsm_context")
80+
self.mock_set_dsm_context = patcher.start()
81+
self.addCleanup(patcher.stop)
82+
7983
def test_datadog_lambda_wrapper(self):
8084
wrapper.dd_tracing_enabled = False
8185

@@ -563,6 +567,62 @@ def return_type_test(event, context):
563567
self.assertEqual(result, test_result)
564568
self.assertFalse(MockPrintExc.called)
565569

570+
def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self):
571+
os.environ["DD_DATA_STREAMS_ENABLED"] = "true"
572+
wrapper.dd_tracing_enabled = True
573+
574+
@wrapper.datadog_lambda_wrapper
575+
def lambda_handler(event, context):
576+
return "ok"
577+
578+
result = lambda_handler({}, get_mock_context())
579+
self.assertEqual(result, "ok")
580+
self.mock_set_dsm_context.assert_called_once()
581+
582+
del os.environ["DD_DATA_STREAMS_ENABLED"]
583+
584+
def test_set_dsm_context_not_called_when_only_DSM_enabled(self):
585+
os.environ["DD_DATA_STREAMS_ENABLED"] = "true"
586+
wrapper.dd_tracing_enabled = False
587+
588+
@wrapper.datadog_lambda_wrapper
589+
def lambda_handler(event, context):
590+
return "ok"
591+
592+
result = lambda_handler({}, get_mock_context())
593+
self.assertEqual(result, "ok")
594+
self.mock_set_dsm_context.assert_not_called()
595+
596+
del os.environ["DD_DATA_STREAMS_ENABLED"]
597+
598+
def test_set_dsm_context_not_called_when_only_tracing_enabled(self):
599+
os.environ["DD_DATA_STREAMS_ENABLED"] = "false"
600+
wrapper.dd_tracing_enabled = True
601+
602+
@wrapper.datadog_lambda_wrapper
603+
def lambda_handler(event, context):
604+
return "ok"
605+
606+
result = lambda_handler({}, get_mock_context())
607+
self.assertEqual(result, "ok")
608+
self.mock_set_dsm_context.assert_not_called()
609+
610+
del os.environ["DD_DATA_STREAMS_ENABLED"]
611+
612+
def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self):
613+
os.environ["DD_DATA_STREAMS_ENABLED"] = "false"
614+
wrapper.dd_tracing_enabled = False
615+
616+
@wrapper.datadog_lambda_wrapper
617+
def lambda_handler(event, context):
618+
return "ok"
619+
620+
result = lambda_handler({}, get_mock_context())
621+
self.assertEqual(result, "ok")
622+
self.mock_set_dsm_context.assert_not_called()
623+
624+
del os.environ["DD_DATA_STREAMS_ENABLED"]
625+
566626

567627
class TestLambdaDecoratorSettings(unittest.TestCase):
568628
def test_some_envs_should_depend_on_dd_tracing_enabled(self):

0 commit comments

Comments
 (0)