Skip to content

Commit 47131b0

Browse files
committed
add config variables and sampling rules
1 parent eb4773d commit 47131b0

File tree

6 files changed

+160
-71
lines changed

6 files changed

+160
-71
lines changed

ddtrace/contrib/internal/asgi/middleware.py

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import ipaddress
12
import os
23
import sys
34
from typing import Any
@@ -23,6 +24,7 @@
2324
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
2425
from ddtrace.internal.utils import get_blocked
2526
from ddtrace.internal.utils import set_blocked
27+
from ddtrace.internal.utils.formats import asbool
2628
from ddtrace.trace import Span
2729

2830

@@ -34,7 +36,12 @@
3436
service_name=config._get_service(default="asgi"),
3537
request_span_name="asgi.request",
3638
distributed_tracing=True,
37-
_trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False),
39+
_trace_asgi_websocket=asbool(os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False)),
40+
# TODO: set as initially false until we gradually release feature
41+
_trace_asgi_websocket_messages=asbool(os.getenv("DD_TRACE_WEBSOCKET_MESSAGES_ENABLED", default=True)),
42+
_asgi_websockets_inherit_sampling=asbool(
43+
os.getenv("DD_TRACE_WEBSOCKET_MESSAGES_INHERIT_SAMPLING", default=True)
44+
),
3845
),
3946
)
4047

@@ -135,7 +142,6 @@ async def __call__(self, scope, receive, send):
135142
return await self.app(scope, receive, send)
136143

137144
method = "WEBSOCKET"
138-
139145
else:
140146
return await self.app(scope, receive, send)
141147
try:
@@ -177,11 +183,12 @@ async def __call__(self, scope, receive, send):
177183
ctx.set_item("req_span", span)
178184

179185
# set span.kind to the type of request being performed
186+
# question: should this be network.client.ip or
187+
180188
span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
181189

182190
if scope["type"] == "websocket":
183191
span.set_tag_str("http.upgraded", "websocket")
184-
# TODO: add websocket session id
185192

186193
if "datadog" not in scope:
187194
scope["datadog"] = {"request_spans": [span]}
@@ -255,13 +262,14 @@ async def __call__(self, scope, receive, send):
255262
async def wrapped_send(message):
256263
"""
257264
websocket.message.type
258-
websocket.session.id*
259265
websocket.message.length
260-
websocket.message.frames*
261-
websocket.message.send_time*
266+
websocket.message.frames
262267
"""
263268
try:
264-
if message.get("type") == "websocket.send":
269+
if (
270+
self.integration_config._trace_asgi_websocket_messages
271+
and message.get("type") == "websocket.send"
272+
):
265273
with self.tracer.trace(
266274
"websocket.send",
267275
service=span.service,
@@ -270,15 +278,38 @@ async def wrapped_send(message):
270278
) as send_span:
271279
send_span.set_tag_str(COMPONENT, self.integration_config.integration_name)
272280
send_span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
281+
282+
# set tags related to peer.hostname
283+
client = scope.get("client")
284+
if len(client) >= 1:
285+
client_ip = client[0]
286+
span.set_tag_str("out.host", client_ip)
287+
try:
288+
ipaddress.ip_address(client_ip) # validate ip address
289+
span.set_tag_str("network.client.ip", client_ip)
290+
except ValueError:
291+
pass
273292
# set link to http handshake span
274293
# The link should have the attribute dd.kind set to resuming.
275294
# If the instrumented library supports multicast or broadcast sending,
276295
# there must be a link to the handshake span of every affected connection;
277-
# any websocket.session.id tag then should be added to the span link instead.
278-
elif message.get("type") == "websocket.close":
296+
send_span.set_link(
297+
trace_id=span.trace_id, span_id=span.span_id, attributes={"dd.kind": "resuming"}
298+
)
299+
send_span.set_metric("websocket.message.frames", 1)
300+
if "text" in message:
301+
send_span.set_tag_str("websocket.message.type", "text")
302+
send_span.set_metric("websocket.message.length", len(message["text"].encode("utf-8")))
303+
elif "binary" in message:
304+
send_span.set_tag_str("websocket.message.type", "binary")
305+
send_span.set_metric("websocket.message.length", len(message["bytes"]))
306+
307+
elif (
308+
self.integration_config._trace_asgi_websocket_messages
309+
and message.get("type") == "websocket.close"
310+
):
279311
"""
280312
tags:
281-
websocket.session.id*
282313
websocket.close.code
283314
websocket.close.reason
284315
@@ -291,7 +322,15 @@ async def wrapped_send(message):
291322
) as close_span:
292323
close_span.set_tag_str(COMPONENT, self.integration_config.integration_name)
293324
close_span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
294-
# TODO: add span links?
325+
client = scope.get("client")
326+
if len(client) >= 1:
327+
client_ip = client[0]
328+
span.set_tag_str("out.host", client_ip)
329+
try:
330+
ipaddress.ip_address(client_ip) # validate ip address
331+
span.set_tag_str("network.client.ip", client_ip)
332+
except ValueError:
333+
pass
295334
if "text" in message:
296335
close_span.set_tag_str("websocket.message.type", "text")
297336
close_span.set_metric("websocket.message.length", len(message["text"].encode("utf-8")))
@@ -343,27 +382,35 @@ async def wrapped_receive():
343382
"""
344383
tags:
345384
websocket.message.type
346-
websocket.session.id*
347385
websocket.message.length
348-
websocket.message.frames*
349-
websocket.message.receive_time*
386+
websocket.message.frames
387+
350388
_dd.dm.inherited
351389
_dd.dm.service
352390
_dd.dm.resource
353391
"""
354392

355393
try:
356394
message = await receive()
357-
if message["type"] == "websocket.receive":
358-
with self.tracer.trace(
359-
"websocket.receive",
395+
if (
396+
self.integration_config._trace_asgi_websocket_messages
397+
and message["type"] == "websocket.receive"
398+
):
399+
with self.tracer.start_span(
400+
name="websocket.receive",
360401
service=span.service,
361402
resource=f"websocket {scope.get('path', '')}",
362403
span_type="websocket",
404+
child_of=None,
363405
) as ws_span:
364406
ws_span.set_tag_str(COMPONENT, self.integration_config.integration_name)
365407
ws_span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
366408
core.dispatch("asgi.websocket.receive", (message,))
409+
# the span should have link to http handshake span
410+
# the link should have attribute dd.kind set to executed_by
411+
ws_span.set_link(
412+
trace_id=span.trace_id, span_id=span.span_id, attributes={"dd.kind": "executed_by"}
413+
)
367414

368415
if "text" in message:
369416
ws_span.set_tag_str("websocket.message.type", "text")
@@ -372,27 +419,34 @@ async def wrapped_receive():
372419
ws_span.set_tag_str("websocket.message.type", "binary")
373420
ws_span.set_metric("websocket.message.length", len(message["bytes"]))
374421

375-
ws_span.set_metric("_dd.dm.inherited", 1)
376-
parent_span = self.tracer.current_root_span()
377-
if parent_span is not None:
378-
ws_span.set_tag_str("_dd.dm.service", parent_span.service)
379-
ws_span.set_tag_str("_dd.dm.resource", parent_span.resource)
422+
# since asgi is a high level framework, frames is always 1
423+
ws_span.set_metric("websocket.message.frames", 1)
380424

381-
elif message["type"] == "websocket.disconnect":
425+
if self.integration_config._asgi_websockets_inherit_sampling:
426+
ws_span.set_metric("_dd.dm.inherited", 1)
427+
ws_span.set_tag_str("_dd.dm.service", span.service)
428+
ws_span.set_tag_str("_dd.dm.resource", span.resource)
429+
430+
elif (
431+
self.integration_config._trace_asgi_websocket_messages
432+
and message["type"] == "websocket.disconnect"
433+
):
382434
# peer closes the connection
383-
with self.tracer.trace(
384-
"websocket.close",
435+
# in this case the span will be trace root (will behave like the websocket.receive use case)
436+
with self.tracer.start_span(
437+
name="websocket.close",
385438
service=span.service,
386439
resource=f"websocket {scope.get('path', '')}",
387440
span_type="websocket",
441+
child_of=None,
388442
) as close_span:
389443
close_span.set_tag_str(COMPONENT, self.integration_config.integration_name)
390444
close_span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
391-
close_span.set_metric("_dd.dm.inherited", 1)
392-
parent_span = self.tracer.current_root_span()
393-
if parent_span is not None:
394-
close_span.set_tag_str("_dd.dm.service", parent_span.service)
395-
close_span.set_tag_str("_dd.dm.resource", parent_span.resource)
445+
446+
if self.integration_config._asgi_websockets_inherit_sampling:
447+
close_span.set_metric("_dd.dm.inherited", 1)
448+
close_span.set_tag_str("_dd.dm.service", span.service)
449+
close_span.set_tag_str("_dd.dm.resource", span.resource)
396450

397451
code = message.get("code")
398452
reason = message.get("reason")
@@ -427,7 +481,7 @@ async def wrapped_blocked_send(message):
427481
message["more_body"] = False
428482
core.dispatch("asgi.finalize_response", (content, None))
429483
try:
430-
return await send(message)
484+
result = await send(message)
431485
finally:
432486
trace_utils.set_http_meta(
433487
span, self.integration_config, status_code=status, response_headers=headers
@@ -440,7 +494,6 @@ async def wrapped_blocked_send(message):
440494
try:
441495
core.dispatch("asgi.start_request", ("asgi",))
442496
# Do not block right here. Wait for route to be resolved in starlette/patch.py
443-
# TODO: do I need to change to wrapped_receive?
444497
return await self.app(scope, wrapped_recv, wrapped_send)
445498
except BlockingException as e:
446499
set_blocked(e.args[0])

ddtrace/contrib/internal/django/patch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
),
8181
use_handler_resource_format=asbool(os.getenv("DD_DJANGO_USE_HANDLER_RESOURCE_FORMAT", default=False)),
8282
use_legacy_resource_format=asbool(os.getenv("DD_DJANGO_USE_LEGACY_RESOURCE_FORMAT", default=False)),
83-
_trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False),
83+
_trace_asgi_websocket=asbool(os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False)),
8484
),
8585
)
8686

ddtrace/contrib/internal/fastapi/patch.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ddtrace.contrib.internal.starlette.patch import traced_route_init
1313
from ddtrace.internal.logger import get_logger
1414
from ddtrace.internal.schema import schematize_service_name
15+
from ddtrace.internal.utils.formats import asbool
1516
from ddtrace.internal.utils.wrappers import unwrap as _u
1617
from ddtrace.settings.asm import config as asm_config
1718
from ddtrace.trace import Pin
@@ -26,7 +27,11 @@
2627
request_span_name="fastapi.request",
2728
distributed_tracing=True,
2829
trace_query_string=None, # Default to global config
29-
_trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False),
30+
_trace_asgi_websocket=asbool(os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False)),
31+
_trace_asgi_websocket_messages=asbool(os.getenv("DD_TRACE_WEBSOCKET_MESSAGES_ENABLED", default=True)),
32+
_asgi_websockets_inherit_sampling=asbool(
33+
os.getenv("DD_TRACE_WEBSOCKET_MESSAGES_INHERIT_SAMPLING", default=True)
34+
),
3035
),
3136
)
3237

ddtrace/contrib/internal/starlette/patch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ddtrace.internal.utils import get_argument_value
2525
from ddtrace.internal.utils import get_blocked
2626
from ddtrace.internal.utils import set_argument_value
27+
from ddtrace.internal.utils.formats import asbool
2728
from ddtrace.internal.utils.wrappers import unwrap as _u
2829
from ddtrace.settings.asm import config as asm_config
2930
from ddtrace.trace import Pin
@@ -39,7 +40,8 @@
3940
_default_service=schematize_service_name("starlette"),
4041
request_span_name="starlette.request",
4142
distributed_tracing=True,
42-
_trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False),
43+
_trace_asgi_websocket=asbool(os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False)),
44+
_trace_asgi_websocket_messages=asbool(os.getenv("DD_TRACE_WEBSOCKET_MESSAGES_ENABLED", default=True)),
4345
),
4446
)
4547

tests/contrib/fastapi/test_fastapi.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,8 @@ def test_table_query_snapshot(snapshot_client):
540540
}
541541

542542

543-
@snapshot()
543+
# @pytest.mark.subprocess(env=dict(DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true"))
544+
@snapshot(ignores=["meta._dd.span_links", "metrics.websocket.message.length"])
544545
def test_traced_websocket(test_spans, snapshot_app):
545546
client = TestClient(snapshot_app)
546547
with override_config("fastapi", dict(_trace_asgi_websocket=True)):
@@ -550,6 +551,16 @@ def test_traced_websocket(test_spans, snapshot_app):
550551
websocket.send_text("ping")
551552

552553

554+
# @snapshot()
555+
# def test_websocket_trace_sampling_priority(test_spans, snapshot_app):
556+
# from ddtrace import tracer
557+
# with tracer.trace("test-root", service="mytest", resource="/ws", _sampling_priority=0):
558+
# client
559+
# with TestClient(app).websocket_connect("/ws") as ws:
560+
# ws.send_text("ping")
561+
# ws.receive_text()
562+
563+
553564
def test_dont_trace_websocket_by_default(client, test_spans):
554565
initial_event_count = len(test_spans.pop_traces())
555566
with client.websocket_connect("/ws") as websocket:

0 commit comments

Comments
 (0)