Skip to content

Commit 019294c

Browse files
authored
fix(tracing/botocore): fix incorrect context propagation type for SNS (backport #3404) (#3462)
This is an automatic backport of pull request #3404 done by [Mergify](https://mergify.com). --- <details> <summary>Mergify commands and options</summary> <br /> More conditions and actions can be found in the [documentation](https://docs.mergify.com/). You can also trigger Mergify actions by commenting on this pull request: - `@Mergifyio refresh` will re-evaluate the rules - `@Mergifyio rebase` will rebase this PR on its base branch - `@Mergifyio update` will merge the base branch into this PR - `@Mergifyio backport <destination>` will backport this PR on `<destination>` branch Additionally, on Mergify [dashboard](https://dashboard.mergify.com/) you can: - look at your merge queues - generate the Mergify configuration with the config editor. Finally, you can contact us on https://mergify.com </details>
1 parent 7630d05 commit 019294c

File tree

4 files changed

+39
-17
lines changed

4 files changed

+39
-17
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
6161
pass
6262

6363

64-
def inject_trace_data_to_message_attributes(trace_data, entry):
65-
# type: (Dict[str, str], Dict[str, Any]) -> None
64+
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None):
65+
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
6666
"""
6767
:trace_data: trace headers to be stored in the entry's MessageAttributes
6868
:entry: an SQS or SNS record
69+
:endpoint: endpoint of message, "sqs" or "sns"
6970
7071
Inject trace headers into the an SQS or SNS record's MessageAttributes
7172
"""
7273
if "MessageAttributes" not in entry:
7374
entry["MessageAttributes"] = {}
74-
# An Amazon SQS message can contain up to 10 metadata attributes.
75+
# Max of 10 message attributes.
7576
if len(entry["MessageAttributes"]) < 10:
76-
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
77+
if endpoint == "sqs":
78+
# Use String since changing this to Binary would be a breaking
79+
# change as other tracers expect this to be a String.
80+
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
81+
elif endpoint == "sns":
82+
# Use Binary since SNS subscription filter policies fail silently
83+
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
84+
# AWS will encode our value if it sees "Binary"
85+
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
86+
else:
87+
log.warning("skipping trace injection, endpoint is not SNS or SQS")
7788
else:
89+
# In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
7890
log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded")
7991

8092

81-
def inject_trace_to_sqs_or_sns_batch_message(params, span):
82-
# type: (Any, Span) -> None
93+
def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None):
94+
# type: (Any, Span, Optional[str]) -> None
8395
"""
8496
:params: contains the params for the current botocore action
8597
:span: the span which provides the trace context to be propagated
98+
:endpoint: endpoint of message, "sqs" or "sns"
8699
87100
Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
88101
"""
@@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
94107
# or PublishBatchRequestEntries (in case of PublishBatch).
95108
entries = params.get("Entries", params.get("PublishBatchRequestEntries", []))
96109
for entry in entries:
97-
inject_trace_data_to_message_attributes(trace_data, entry)
110+
inject_trace_data_to_message_attributes(trace_data, entry, endpoint)
98111

99112

100-
def inject_trace_to_sqs_or_sns_message(params, span):
101-
# type: (Any, Span) -> None
113+
def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None):
114+
# type: (Any, Span, Optional[str]) -> None
102115
"""
103116
:params: contains the params for the current botocore action
104117
:span: the span which provides the trace context to be propagated
118+
:endpoint: endpoint of message, "sqs" or "sns"
105119
106120
Inject trace headers into MessageAttributes for the SQS or SNS record
107121
"""
108122
trace_data = {}
109123
HTTPPropagator.inject(span.context, trace_data)
110124

111-
inject_trace_data_to_message_attributes(trace_data, params)
125+
inject_trace_data_to_message_attributes(trace_data, params, endpoint)
112126

113127

114128
def inject_trace_to_eventbridge_detail(params, span):
@@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
293307
if endpoint_name == "lambda" and operation == "Invoke":
294308
inject_trace_to_client_context(params, span)
295309
if endpoint_name == "sqs" and operation == "SendMessage":
296-
inject_trace_to_sqs_or_sns_message(params, span)
310+
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
297311
if endpoint_name == "sqs" and operation == "SendMessageBatch":
298-
inject_trace_to_sqs_or_sns_batch_message(params, span)
312+
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
299313
if endpoint_name == "events" and operation == "PutEvents":
300314
inject_trace_to_eventbridge_detail(params, span)
301315
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"):
302316
inject_trace_to_kinesis_stream(params, span)
303317
if endpoint_name == "sns" and operation == "Publish":
304-
inject_trace_to_sqs_or_sns_message(params, span)
318+
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
305319
if endpoint_name == "sns" and operation == "PublishBatch":
306-
inject_trace_to_sqs_or_sns_batch_message(params, span)
320+
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
307321
except Exception:
308322
log.warning("Unable to inject trace context", exc_info=True)
309323

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ services:
132132
- ./.ddriot:/root/project/.riot
133133

134134
localstack:
135-
image: localstack/localstack:0.13.1
135+
image: localstack/localstack:0.14.1
136136
network_mode: bridge
137137
ports:
138138
- "127.0.0.1:4566:4566"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 <https://github.com/DataDog/serverless-plugin-datadog/issues/232>`_

tests/contrib/botocore/test.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,9 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self):
10031003
assert msg_str == "test"
10041004
msg_attr = msg_body["MessageAttributes"]
10051005
assert msg_attr.get("_datadog") is not None
1006-
headers = json.loads(msg_attr["_datadog"]["Value"])
1006+
assert msg_attr["_datadog"]["Type"] == "Binary"
1007+
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
1008+
headers = json.loads(datadog_value_decoded.decode())
10071009
assert headers is not None
10081010
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
10091011
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
@@ -1068,7 +1070,9 @@ def test_sns_send_message_trace_injection_with_message_attributes(self):
10681070
assert msg_str == "test"
10691071
msg_attr = msg_body["MessageAttributes"]
10701072
assert msg_attr.get("_datadog") is not None
1071-
headers = json.loads(msg_attr["_datadog"]["Value"])
1073+
assert msg_attr["_datadog"]["Type"] == "Binary"
1074+
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
1075+
headers = json.loads(datadog_value_decoded.decode())
10721076
assert headers is not None
10731077
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
10741078
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)

0 commit comments

Comments
 (0)