-
Notifications
You must be signed in to change notification settings - Fork 46
refactor: remove tracer dependencies to support dsm sqs -> lambda #612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
d7ce695
623f49e
96e3d88
b5a711d
7810ced
e94eb22
54bedbf
823a07f
5356cc6
45ed35f
f729649
24f6ed9
5bc4b8f
3fa3014
c066b8f
235659b
34b9f45
ff6e3c4
1a82f68
8bb2053
5f006de
e28bac4
2509fcc
ab8c871
2089fa9
24851f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,61 @@ | ||
from datadog_lambda import logger | ||
import logging | ||
import json | ||
from datadog_lambda.trigger import EventTypes | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
def set_dsm_context(event, event_source): | ||
|
||
def set_dsm_context(event, event_source): | ||
if event_source.equals(EventTypes.SQS): | ||
_dsm_set_sqs_context(event) | ||
|
||
|
||
def _dsm_set_sqs_context(event): | ||
from datadog_lambda.wrapper import format_err_with_traceback | ||
from ddtrace.internal.datastreams import data_streams_processor | ||
from ddtrace.internal.datastreams.processor import DsmPathwayCodec | ||
from ddtrace.internal.datastreams.botocore import ( | ||
get_datastreams_context, | ||
calculate_sqs_payload_size, | ||
) | ||
from ddtrace.data_streams import set_consume_checkpoint | ||
|
||
records = event.get("Records") | ||
if records is None: | ||
return | ||
processor = data_streams_processor() | ||
|
||
for record in records: | ||
try: | ||
queue_arn = record.get("eventSourceARN", "") | ||
|
||
contextjson = get_datastreams_context(record) | ||
payload_size = calculate_sqs_payload_size(record) | ||
|
||
ctx = DsmPathwayCodec.decode(contextjson, processor) | ||
ctx.set_checkpoint( | ||
["direction:in", f"topic:{queue_arn}", "type:sqs"], | ||
payload_size=payload_size, | ||
) | ||
except Exception as e: | ||
logger.error(format_err_with_traceback(e)) | ||
arn = record.get("eventSourceARN", "") | ||
context_json = _get_dsm_context_from_lambda(record) | ||
if not context_json: | ||
logger.debug("DataStreams skipped lambda message: %r", record) | ||
return None | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just return is enough |
||
carrier_get = _create_carrier_get(context_json) | ||
set_consume_checkpoint("sqs", arn, carrier_get) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Let's make sure we update the set manual_checkpoint=False here once the other PR is merged & the tracer released. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Plan on keeping this PR in draft until tracer released, and will make sure to update |
||
|
||
def _get_dsm_context_from_lambda(message): | ||
""" | ||
Lambda-specific message formats: | ||
- message.messageAttributes._datadog.stringValue (SQS -> lambda) | ||
""" | ||
context_json = None | ||
message_attributes = message.get("messageAttributes") | ||
if not message_attributes: | ||
michael-zhao459 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.debug("DataStreams skipped lambda message: %r", message) | ||
return None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we're logging debug messages multiple times for the same record. |
||
|
||
if "_datadog" not in message_attributes: | ||
logger.debug("DataStreams skipped lambda message: %r", message) | ||
return None | ||
|
||
datadog_attr = message_attributes["_datadog"] | ||
|
||
if "stringValue" in datadog_attr: | ||
# SQS -> lambda | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do a type check here to ensure this is a dict. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should also make sure |
||
context_json = json.loads(datadog_attr["stringValue"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use the I would recommend creating a separate |
||
else: | ||
logger.debug("DataStreams did not handle lambda message: %r", message) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend making each of these log lines slightly different. That way when one is encountered, it is easy to find the exact line of code where it was produced. Otherwise, we don't know what the actual issue was. |
||
return context_json | ||
|
||
|
||
def _create_carrier_get(context_json): | ||
def carrier_get(key): | ||
return context_json.get(key) | ||
|
||
return carrier_get |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,35 @@ | ||
import unittest | ||
from unittest.mock import patch, MagicMock | ||
|
||
from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context | ||
import json | ||
from unittest.mock import patch | ||
|
||
from datadog_lambda.dsm import ( | ||
set_dsm_context, | ||
_dsm_set_sqs_context, | ||
_get_dsm_context_from_lambda, | ||
) | ||
from datadog_lambda.trigger import EventTypes, _EventSource | ||
|
||
|
||
class TestDsmSQSContext(unittest.TestCase): | ||
class TestSetDSMContext(unittest.TestCase): | ||
def setUp(self): | ||
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") | ||
self.mock_dsm_set_sqs_context = patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
patcher = patch("ddtrace.internal.datastreams.data_streams_processor") | ||
self.mock_data_streams_processor = patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") | ||
self.mock_get_datastreams_context = patcher.start() | ||
self.mock_get_datastreams_context.return_value = {} | ||
self.addCleanup(patcher.stop) | ||
|
||
patcher = patch( | ||
"ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size" | ||
) | ||
self.mock_calculate_sqs_payload_size = patcher.start() | ||
self.mock_calculate_sqs_payload_size.return_value = 100 | ||
patcher = patch("ddtrace.data_streams.set_consume_checkpoint") | ||
self.mock_set_consume_checkpoint = patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") | ||
self.mock_dsm_pathway_codec_decode = patcher.start() | ||
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda") | ||
self.mock_get_dsm_context_from_lambda = patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
def test_non_sqs_event_source_does_nothing(self): | ||
"""Test that non-SQS event sources don't trigger DSM context setting""" | ||
event = {} | ||
# Use Unknown Event Source | ||
event_source = _EventSource(EventTypes.UNKNOWN) | ||
set_dsm_context(event, event_source) | ||
|
||
# DSM context should not be set for non-SQS events | ||
self.mock_dsm_set_sqs_context.assert_not_called() | ||
|
||
def test_sqs_event_with_no_records_does_nothing(self): | ||
|
@@ -51,7 +42,7 @@ def test_sqs_event_with_no_records_does_nothing(self): | |
|
||
for event in events_with_no_records: | ||
_dsm_set_sqs_context(event) | ||
self.mock_data_streams_processor.assert_not_called() | ||
self.mock_set_consume_checkpoint.assert_not_called() | ||
|
||
def test_sqs_event_triggers_dsm_sqs_context(self): | ||
"""Test that SQS event sources trigger the SQS-specific DSM context function""" | ||
|
@@ -77,36 +68,152 @@ def test_sqs_multiple_records_process_each_record(self): | |
{ | ||
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1", | ||
"body": "Message 1", | ||
"messageAttributes": { | ||
"_datadog": { | ||
"stringValue": json.dumps( | ||
{"dd-pathway-ctx-base64": "context1"} | ||
), | ||
"dataType": "String", | ||
} | ||
}, | ||
}, | ||
{ | ||
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2", | ||
"body": "Message 2", | ||
"messageAttributes": { | ||
"_datadog": { | ||
"stringValue": json.dumps( | ||
{"dd-pathway-ctx-base64": "context2"} | ||
), | ||
"dataType": "String", | ||
} | ||
}, | ||
}, | ||
{ | ||
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3", | ||
"body": "Message 3", | ||
"messageAttributes": { | ||
"_datadog": { | ||
"stringValue": json.dumps( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add another test where the |
||
{"dd-pathway-ctx-base64": "context3"} | ||
), | ||
"dataType": "String", | ||
} | ||
}, | ||
}, | ||
] | ||
} | ||
|
||
mock_context = MagicMock() | ||
self.mock_dsm_pathway_codec_decode.return_value = mock_context | ||
self.mock_get_dsm_context_from_lambda.side_effect = [ | ||
{"dd-pathway-ctx-base64": "context1"}, | ||
{"dd-pathway-ctx-base64": "context2"}, | ||
{"dd-pathway-ctx-base64": "context3"}, | ||
] | ||
|
||
_dsm_set_sqs_context(multi_record_event) | ||
|
||
self.assertEqual(mock_context.set_checkpoint.call_count, 3) | ||
self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3) | ||
|
||
calls = mock_context.set_checkpoint.call_args_list | ||
calls = self.mock_set_consume_checkpoint.call_args_list | ||
expected_arns = [ | ||
"arn:aws:sqs:us-east-1:123456789012:queue1", | ||
"arn:aws:sqs:us-east-1:123456789012:queue2", | ||
"arn:aws:sqs:us-east-1:123456789012:queue3", | ||
] | ||
expected_contexts = ["context1", "context2", "context3"] | ||
|
||
for i, call in enumerate(calls): | ||
args, kwargs = call | ||
tags = args[0] | ||
self.assertIn("direction:in", tags) | ||
self.assertIn(f"topic:{expected_arns[i]}", tags) | ||
self.assertIn("type:sqs", tags) | ||
self.assertEqual(kwargs["payload_size"], 100) | ||
service_type = args[0] | ||
arn = args[1] | ||
carrier_get_func = args[2] | ||
|
||
self.assertEqual(service_type, "sqs") | ||
|
||
self.assertEqual(arn, expected_arns[i]) | ||
|
||
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") | ||
self.assertEqual(pathway_ctx, expected_contexts[i]) | ||
|
||
|
||
class TestGetDSMContext(unittest.TestCase): | ||
def test_sqs_to_lambda_string_value_format(self): | ||
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" | ||
trace_context = { | ||
"x-datadog-trace-id": "789123456", | ||
"x-datadog-parent-id": "321987654", | ||
"dd-pathway-ctx": "test-pathway-ctx", | ||
} | ||
|
||
lambda_record = { | ||
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", | ||
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", | ||
"body": "Test message.", | ||
"attributes": { | ||
"ApproximateReceiveCount": "1", | ||
"SentTimestamp": "1545082649183", | ||
"SenderId": "AIDAIENQZJOLO23YVJ4VO", | ||
"ApproximateFirstReceiveTimestamp": "1545082649185", | ||
}, | ||
"messageAttributes": { | ||
"_datadog": { | ||
"stringValue": json.dumps(trace_context), | ||
"stringListValues": [], | ||
"binaryListValues": [], | ||
"dataType": "String", | ||
}, | ||
"myAttribute": { | ||
"stringValue": "myValue", | ||
"stringListValues": [], | ||
"binaryListValues": [], | ||
"dataType": "String", | ||
}, | ||
}, | ||
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", | ||
"eventSource": "aws:sqs", | ||
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", | ||
"awsRegion": "us-east-2", | ||
} | ||
|
||
result = _get_dsm_context_from_lambda(lambda_record) | ||
|
||
assert result is not None | ||
assert result == trace_context | ||
assert result["x-datadog-trace-id"] == "789123456" | ||
assert result["x-datadog-parent-id"] == "321987654" | ||
assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
||
def test_no_message_attributes(self): | ||
"""Test message without MessageAttributes returns None.""" | ||
message = { | ||
"messageId": "test-message-id", | ||
"body": "Test message without attributes", | ||
} | ||
|
||
result = _get_dsm_context_from_lambda(message) | ||
|
||
assert result is None | ||
|
||
def test_no_datadog_attribute(self): | ||
"""Test message with MessageAttributes but no _datadog attribute returns None.""" | ||
message = { | ||
"messageId": "test-message-id", | ||
"body": "Test message", | ||
"messageAttributes": { | ||
"customAttribute": {"stringValue": "custom-value", "dataType": "String"} | ||
}, | ||
} | ||
|
||
result = _get_dsm_context_from_lambda(message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] newline here |
||
assert result is None | ||
|
||
def test_empty_datadog_attribute(self): | ||
"""Test message with empty _datadog attribute returns None.""" | ||
message = { | ||
"messageId": "test-message-id", | ||
"messageAttributes": {"_datadog": {}}, | ||
} | ||
|
||
result = _get_dsm_context_from_lambda(message) | ||
|
||
assert result is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we remove the try / except here. Is there a reason for that (maybe there is and I don't see it).
But we want to make sure our instrumentation never prevents the lambda from being executed, even if there is an issue with the instrumentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, we should move this inside the try/except.