Skip to content

Commit

Permalink
Merge branch 'main' into redis_additional_attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Sep 16, 2024
2 parents ae5f1a3 + bed13f5 commit 87374e4
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 76 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-instrumentation-fastapi` Add autoinstrumentation mechanism tests.
([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860))

## Version 1.27.0/0.48b0 ()

### Added

- `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng
inside kafka-python's instrumentation
([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-fastapi` Add ability to disable internal HTTP send and receive spans
([#2802](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2802))

### Breaking changes

Expand Down Expand Up @@ -48,6 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2814](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2814))
- `opentelemetry-instrumentation` Fix the description of `http.server.duration` and `http.server.request.duration`
([#2753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2753))
- `opentelemetry-instrumentation-grpc` Fix grpc supported version
([#2845](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2845))

## Version 1.26.0/0.47b0 (2024-07-23)

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ Below is a checklist of things to be mindful of when implementing a new instrume
- Isolate sync and async test
- For synchronous tests, the typical test case class is inherited from `opentelemetry.test.test_base.TestBase`. However, if you want to write asynchronous tests, the test case class should inherit also from `IsolatedAsyncioTestCase`. Adding asynchronous tests to a common test class can lead to tests passing without actually running, which can be misleading.
- ex. <https://github.com/open-telemetry/opentelemetry-python-contrib/blob/60fb936b7e5371b3e5587074906c49fb873cbd76/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py#L84>
- All instrumentations have the same version. If you are going to develop a new instrumentation it would probably have `X.Y.dev` version and depends on `opentelemetry-instrumentation` and `opentelemetry-semantic-conventions` for the same version. That means that if you want to install your instrumentation you need to install its dependencies from this repo and the core repo also from git.

## Expectations from contributors

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
| [opentelemetry-instrumentation-falcon](./opentelemetry-instrumentation-falcon) | falcon >= 1.4.1, < 4.0.0 | Yes | experimental
| [opentelemetry-instrumentation-fastapi](./opentelemetry-instrumentation-fastapi) | fastapi ~= 0.58 | Yes | migration
| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0 | Yes | migration
| [opentelemetry-instrumentation-grpc](./opentelemetry-instrumentation-grpc) | grpcio ~= 1.27 | No | experimental
| [opentelemetry-instrumentation-grpc](./opentelemetry-instrumentation-grpc) | grpcio >= 1.42.0 | No | experimental
| [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0 | No | migration
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 | No | experimental
| [opentelemetry-instrumentation-kafka-python](./opentelemetry-instrumentation-kafka-python) | kafka-python >= 2.0, < 3.0,kafka-python-ng >= 2.0, < 3.0 | No | experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def get_default_span_details(scope: dict) -> Tuple[str, dict]:


def _collect_target_attribute(
scope: typing.Dict[str, typing.Any]
scope: typing.Dict[str, typing.Any],
) -> typing.Optional[str]:
"""
Returns the target path as defined by the Semantic Conventions.
Expand Down Expand Up @@ -529,6 +529,7 @@ class OpenTelemetryMiddleware:
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""

# pylint: disable=too-many-branches
Expand All @@ -547,6 +548,7 @@ def __init__(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[typing.Literal["receive", "send"]] | None = None,
):
# initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
Expand Down Expand Up @@ -653,6 +655,12 @@ def __init__(
)
or []
)
self.exclude_receive_span = (
"receive" in exclude_spans if exclude_spans else False
)
self.exclude_send_span = (
"send" in exclude_spans if exclude_spans else False
)

# pylint: disable=too-many-statements
async def __call__(
Expand Down Expand Up @@ -796,8 +804,10 @@ async def __call__(
span.end()

# pylint: enable=too-many-branches

def _get_otel_receive(self, server_span_name, scope, receive):
if self.exclude_receive_span:
return receive

@wraps(receive)
async def otel_receive():
with self.tracer.start_as_current_span(
Expand All @@ -821,6 +831,66 @@ async def otel_receive():

return otel_receive

def _set_send_span(
self,
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
):
"""Set send span attributes and status code."""
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])

if status_code:
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
return expecting_trailers

def _set_server_span(
self, server_span, message, status_code, duration_attrs
):
"""Set server span attributes and status code."""
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(custom_response_attributes)

if status_code:
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)

def _get_otel_send(
self,
server_span,
Expand All @@ -834,74 +904,46 @@ def _get_otel_send(
@wraps(send)
async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
custom_response_attributes
)
if status_code:
# We record metrics only once
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
if not self.exclude_send_span:
expecting_trailers = self._set_send_span(
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass
self._set_server_span(
server_span, message, status_code, duration_attrs
)

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass

await send(message)

await send(message)
# pylint: disable=too-many-boolean-expressions
if (
not expecting_trailers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,30 @@ async def test_background_execution(self):
_SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9,
)

async def test_exclude_internal_spans(self):
"""Test that internal spans are excluded from the emitted spans when
the `exclude_receive_span` or `exclude_send_span` attributes are set.
"""
cases = [
(["receive", "send"], ["GET / http receive", "GET / http send"]),
(["send"], ["GET / http send"]),
(["receive"], ["GET / http receive"]),
([], []),
]
for exclude_spans, excluded_spans in cases:
self.memory_exporter.clear()
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, exclude_spans=exclude_spans
)
self.seed_app(app)
await self.send_default_request()
await self.get_all_output()
span_list = self.memory_exporter.get_finished_spans()
self.assertTrue(span_list)
for span in span_list:
for excluded_span in excluded_spans:
self.assertNotEqual(span.name, excluded_span)

async def test_trailers(self):
"""Test that trailers are emitted as expected and that the server span is ended
BEFORE the background task is finished."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def client_response_hook(span: Span, scope: dict[str, Any], message: dict[str, A
from __future__ import annotations

import logging
from typing import Collection
from typing import Collection, Literal

import fastapi
from starlette.routing import Match
Expand Down Expand Up @@ -222,7 +222,7 @@ class FastAPIInstrumentor(BaseInstrumentor):

@staticmethod
def instrument_app(
app: fastapi.FastAPI,
app,
server_request_hook: ServerRequestHook = None,
client_request_hook: ClientRequestHook = None,
client_response_hook: ClientResponseHook = None,
Expand All @@ -232,8 +232,28 @@ def instrument_app(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[Literal["receive", "send"]] | None = None,
):
"""Instrument an uninstrumented FastAPI application."""
"""Instrument an uninstrumented FastAPI application.
Args:
app: The fastapi ASGI application callable to forward requests to.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method receive is called.
client_response_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
excluded_urls: Optional comma delimited string of regexes to match URLs that should not be traced.
http_capture_headers_server_request: Optional list of HTTP headers to capture from the request.
http_capture_headers_server_response: Optional list of HTTP headers to capture from the response.
http_capture_headers_sanitize_fields: Optional list of HTTP headers to sanitize.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False

Expand Down Expand Up @@ -273,6 +293,7 @@ def instrument_app(
http_capture_headers_server_request=http_capture_headers_server_request,
http_capture_headers_server_response=http_capture_headers_server_response,
http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields,
exclude_spans=exclude_spans,
)
app._is_instrumented_by_opentelemetry = True
if app not in _InstrumentedFastAPI._instrumented_fastapi_apps:
Expand Down Expand Up @@ -323,6 +344,7 @@ def _instrument(self, **kwargs):
else parse_excluded_urls(_excluded_urls)
)
_InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider")
_InstrumentedFastAPI._exclude_spans = kwargs.get("exclude_spans")
fastapi.FastAPI = _InstrumentedFastAPI

def _uninstrument(self, **kwargs):
Expand Down Expand Up @@ -373,6 +395,7 @@ def __init__(self, *args, **kwargs):
http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request,
http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response,
http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields,
exclude_spans=_InstrumentedFastAPI._exclude_spans,
)
self._is_instrumented_by_opentelemetry = True
_InstrumentedFastAPI._instrumented_fastapi_apps.add(self)
Expand Down
Loading

0 comments on commit 87374e4

Please sign in to comment.