Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ASGI: Capture custom request response headers #1004

3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10

- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes
([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004)

- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes
([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925)
- `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,14 @@ def client_response_hook(span: Span, message: dict):
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials
from opentelemetry.util.http import (
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE,
get_custom_headers,
normalise_request_header_name,
normalise_response_header_name,
remove_url_credentials,
)

_ServerRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
Expand Down Expand Up @@ -223,6 +230,41 @@ def collect_request_attributes(scope):
return result


def collect_custom_request_headers_attributes(scope):
"""returns custom HTTP request headers to be added into SERVER span as span attributes
Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers"""

attributes = {}
custom_request_headers = get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST
)

for header in custom_request_headers:
values = asgi_getter.get(scope, header)
if values:
key = normalise_request_header_name(header)
attributes.setdefault(key, []).extend(values)

return attributes


def collect_custom_response_headers_attributes(message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be ok, but please make sure any new public (non-underscore prefixed) function or symbol is really necessary and meant to be used directly by the user. If not, please prefix them with underscores.

Copy link
Member Author

@sanketmehta28 sanketmehta28 Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. This functions will be used by other frameworks like django or FastAPI to fetch the custom headers and add them to spans as attributes. (just like collect_request_attributes/add_response_attributes)

"""returns custom HTTP response headers to be added into SERVER span as span attributes
Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers"""
attributes = {}
custom_response_headers = get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE
)

for header in custom_response_headers:
values = asgi_getter.get(message, header)
if values:
key = normalise_response_header_name(header)
attributes.setdefault(key, []).extend(values)

return attributes


def get_host_port_url_tuple(scope):
"""Returns (host, port, full_url) tuple."""
server = scope.get("server") or ["0.0.0.0", 80]
Expand Down Expand Up @@ -342,6 +384,13 @@ async def __call__(self, scope, receive, send):
for key, value in attributes.items():
current_span.set_attribute(key, value)

if current_span.kind == trace.SpanKind.SERVER:
custom_attributes = (
collect_custom_request_headers_attributes(scope)
)
if len(custom_attributes) > 0:
current_span.set_attributes(custom_attributes)

if callable(self.server_request_hook):
self.server_request_hook(current_span, scope)

Expand Down Expand Up @@ -395,6 +444,18 @@ async def otel_send(message):
set_status_code(server_span, 200)
set_status_code(send_span, 200)
send_span.set_attribute("type", message["type"])
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_response_headers_attributes(message)
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
custom_response_attributes
)

propagator = get_global_response_propagator()
if propagator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
)
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind, format_span_id, format_trace_id
from opentelemetry.util.http import (
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE,
)


async def http_app(scope, receive, send):
Expand Down Expand Up @@ -62,6 +66,47 @@ async def websocket_app(scope, receive, send):
break


async def http_app_with_custom_headers(scope, receive, send):
message = await receive()
assert scope["type"] == "http"
if message.get("type") == "http.request":
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
(b"Content-Type", b"text/plain"),
(b"custom-test-header-1", b"test-header-value-1"),
(b"custom-test-header-2", b"test-header-value-2"),
],
}
)
await send({"type": "http.response.body", "body": b"*"})


async def websocket_app_with_custom_headers(scope, receive, send):
assert scope["type"] == "websocket"
while True:
message = await receive()
if message.get("type") == "websocket.connect":
await send(
{
"type": "websocket.accept",
"headers": [
(b"custom-test-header-1", b"test-header-value-1"),
(b"custom-test-header-2", b"test-header-value-2"),
],
}
)

if message.get("type") == "websocket.receive":
if message.get("text") == "ping":
await send({"type": "websocket.send", "text": "pong"})

if message.get("type") == "websocket.disconnect":
break


async def simple_asgi(scope, receive, send):
assert isinstance(scope, dict)
if scope["type"] == "http":
Expand Down Expand Up @@ -583,5 +628,237 @@ async def wrapped_app(scope, receive, send):
)


@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3",
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3",
},
)
class TestCustomHeaders(AsgiTestBase, TestBase):
def setUp(self):
super().setUp()
self.tracer_provider, self.exporter = TestBase.create_tracer_provider()
self.tracer = self.tracer_provider.get_tracer(__name__)
self.app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, tracer_provider=self.tracer_provider
)

def test_http_custom_request_headers_in_span_attributes(self):
self.scope["headers"].extend(
[
(b"custom-test-header-1", b"test-header-value-1"),
(b"custom-test-header-2", b"test-header-value-2"),
]
)
self.seed_app(self.app)
self.send_default_request()
self.get_all_output()
span_list = self.exporter.get_finished_spans()
expected = {
"http.request.header.custom_test_header_1": (
"test-header-value-1",
),
"http.request.header.custom_test_header_2": (
"test-header-value-2",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
self.assertSpanHasAttributes(span, expected)

def test_http_custom_request_headers_not_in_span_attributes(self):
self.scope["headers"].extend(
[
(b"custom-test-header-1", b"test-header-value-1"),
]
)
self.seed_app(self.app)
self.send_default_request()
self.get_all_output()
span_list = self.exporter.get_finished_spans()
expected = {
"http.request.header.custom_test_header_1": (
"test-header-value-1",
),
}
not_expected = {
"http.request.header.custom_test_header_2": (
"test-header-value-2",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
self.assertSpanHasAttributes(span, expected)
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)

def test_http_custom_response_headers_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
http_app_with_custom_headers, tracer_provider=self.tracer_provider
)
self.seed_app(self.app)
self.send_default_request()
self.get_all_output()
span_list = self.exporter.get_finished_spans()
expected = {
"http.response.header.custom_test_header_1": (
"test-header-value-1",
),
"http.response.header.custom_test_header_2": (
"test-header-value-2",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
self.assertSpanHasAttributes(span, expected)

def test_http_custom_response_headers_not_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
http_app_with_custom_headers, tracer_provider=self.tracer_provider
)
self.seed_app(self.app)
self.send_default_request()
self.get_all_output()
span_list = self.exporter.get_finished_spans()
not_expected = {
"http.response.header.custom_test_header_3": (
"test-header-value-3",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)

def test_websocket_custom_request_headers_in_span_attributes(self):
self.scope = {
"type": "websocket",
"http_version": "1.1",
"scheme": "ws",
"path": "/",
"query_string": b"",
"headers": [
(b"custom-test-header-1", b"test-header-value-1"),
(b"custom-test-header-2", b"test-header-value-2"),
],
"client": ("127.0.0.1", 32767),
"server": ("127.0.0.1", 80),
}
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
self.send_input({"type": "websocket.receive", "text": "ping"})
self.send_input({"type": "websocket.disconnect"})

self.get_all_output()
span_list = self.exporter.get_finished_spans()
expected = {
"http.request.header.custom_test_header_1": (
"test-header-value-1",
),
"http.request.header.custom_test_header_2": (
"test-header-value-2",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
self.assertSpanHasAttributes(span, expected)

def test_websocket_custom_request_headers_not_in_span_attributes(self):
self.scope = {
"type": "websocket",
"http_version": "1.1",
"scheme": "ws",
"path": "/",
"query_string": b"",
"headers": [
(b"Custom-Test-Header-1", b"test-header-value-1"),
(b"Custom-Test-Header-2", b"test-header-value-2"),
],
"client": ("127.0.0.1", 32767),
"server": ("127.0.0.1", 80),
}
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
self.send_input({"type": "websocket.receive", "text": "ping"})
self.send_input({"type": "websocket.disconnect"})

self.get_all_output()
span_list = self.exporter.get_finished_spans()
not_expected = {
"http.request.header.custom_test_header_3": (
"test-header-value-3",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)

def test_websocket_custom_response_headers_in_span_attributes(self):
self.scope = {
"type": "websocket",
"http_version": "1.1",
"scheme": "ws",
"path": "/",
"query_string": b"",
"headers": [],
"client": ("127.0.0.1", 32767),
"server": ("127.0.0.1", 80),
}
self.app = otel_asgi.OpenTelemetryMiddleware(
websocket_app_with_custom_headers,
tracer_provider=self.tracer_provider,
)
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
self.send_input({"type": "websocket.receive", "text": "ping"})
self.send_input({"type": "websocket.disconnect"})
self.get_all_output()
span_list = self.exporter.get_finished_spans()
expected = {
"http.response.header.custom_test_header_1": (
"test-header-value-1",
),
"http.response.header.custom_test_header_2": (
"test-header-value-2",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
self.assertSpanHasAttributes(span, expected)

def test_websocket_custom_response_headers_not_in_span_attributes(self):
self.scope = {
"type": "websocket",
"http_version": "1.1",
"scheme": "ws",
"path": "/",
"query_string": b"",
"headers": [],
"client": ("127.0.0.1", 32767),
"server": ("127.0.0.1", 80),
}
self.app = otel_asgi.OpenTelemetryMiddleware(
websocket_app_with_custom_headers,
tracer_provider=self.tracer_provider,
)
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
self.send_input({"type": "websocket.receive", "text": "ping"})
self.send_input({"type": "websocket.disconnect"})
self.get_all_output()
span_list = self.exporter.get_finished_spans()
not_expected = {
"http.response.header.custom_test_header_3": (
"test-header-value-3",
),
}
for span in span_list:
if span.kind == SpanKind.SERVER:
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)


if __name__ == "__main__":
unittest.main()