Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up use of suppress_instrumentation in context and fix httpx bug #2061

Merged
merged 12 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2002](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2002))
- `opentelemetry-instrument-grpc` Fix arity of context.abort for AIO RPCs
([#2066](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2066))
- Consolidate instrumentation suppression mechanisms and fix bug in httpx instrumentation
([#2061](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2061))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ classifiers = [
]
dependencies = [
"opentelemetry-api ~= 1.5",
"opentelemetry-instrumentation == 0.44b0.dev",
"wrapt >= 1.0.0, < 2.0.0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder
from opentelemetry.instrumentation.aio_pika.utils import (
is_instrumentation_enabled,
)
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import Span, Tracer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

from aio_pika.abc import AbstractChannel, AbstractMessage

from opentelemetry.instrumentation.aio_pika.utils import (
is_instrumentation_enabled,
)
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def response_hook(span: Span, params: typing.Union[
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
http_status_to_status_code,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagate import inject
Expand Down Expand Up @@ -179,7 +179,7 @@ async def on_request_start(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestStartParams,
):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
trace_config_ctx.span = None
return

Expand Down Expand Up @@ -282,7 +282,7 @@ def _instrument(

# pylint:disable=unused-argument
def instrumented_init(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return wrapped(*args, **kwargs)

client_trace_configs = list(kwargs.get("trace_configs") or [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
from http_server_mock import HttpServerMock
from pkg_resources import iter_entry_points

from opentelemetry import context
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation import aiohttp_client
from opentelemetry.instrumentation.aiohttp_client import (
AioHttpClientInstrumentor,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import Span, StatusCode
Expand Down Expand Up @@ -512,25 +511,17 @@ async def uninstrument_request(server: aiohttp.test_utils.TestServer):
self.assert_spans(1)

def test_suppress_instrumentation(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
run_with_test_server(
self.get_default_request(), self.URL, self.default_handler
)
finally:
context.detach(token)
self.assert_spans(0)

@staticmethod
async def suppressed_request(server: aiohttp.test_utils.TestServer):
async with aiohttp.test_utils.TestClient(server) as client:
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
await client.get(TestAioHttpClientInstrumentor.URL)
context.detach(token)
with suppress_instrumentation():
await client.get(TestAioHttpClientInstrumentor.URL)

def test_suppress_instrumentation_after_creation(self):
run_with_test_server(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagators.textmap import CarrierT, Getter, Setter
Expand Down Expand Up @@ -218,7 +218,7 @@ def _create_processing_span(

def _wrap_send_message(self, sqs_class: type) -> None:
def send_wrapper(wrapped, instance, args, kwargs):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return wrapped(*args, **kwargs)
queue_url = kwargs.get("QueueUrl")
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
Expand Down Expand Up @@ -252,7 +252,7 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
# original exception
if (
context.get_value(_SUPPRESS_INSTRUMENTATION_KEY)
not is_instrumentation_enabled()
or not queue_url
or not entries
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ def response_hook(span, service_name, operation_name, result):
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api

# FIXME: fix the importing of this private attribute when the location of the _SUPPRESS_HTTP_INSTRUMENTATION_KEY is defined.
from opentelemetry.context import _SUPPRESS_HTTP_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.botocore.extensions import _find_extension
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
Expand All @@ -98,7 +94,8 @@ def response_hook(span, service_name, operation_name, result):
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
is_instrumentation_enabled,
suppress_http_instrumentation,
unwrap,
)
from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator
Expand Down Expand Up @@ -171,7 +168,7 @@ def _patched_endpoint_prepare_request(

# pylint: disable=too-many-branches
def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return original_func(*args, **kwargs)

call_context = _determine_call_context(instance, args)
Expand Down Expand Up @@ -200,25 +197,20 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

result = None
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
else:
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
with suppress_http_instrumentation():
result = None
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
finally:
context_api.detach(token)
_safe_invoke(extension.after_service_call)

self._call_response_hook(span, call_context, result)

return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@
)

from opentelemetry import trace as trace_api
from opentelemetry.context import (
_SUPPRESS_HTTP_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import (
suppress_http_instrumentation,
suppress_instrumentation,
)
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.propagators.aws.aws_xray_propagator import TRACE_HEADER_KEY
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -341,23 +338,17 @@ def check_headers(**kwargs):
@mock_xray
def test_suppress_instrumentation_xray_client(self):
xray_client = self._make_client("xray")
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
with suppress_instrumentation():
xray_client.put_trace_segments(TraceSegmentDocuments=["str1"])
xray_client.put_trace_segments(TraceSegmentDocuments=["str2"])
finally:
detach(token)
self.assertEqual(0, len(self.get_finished_spans()))

@mock_xray
def test_suppress_http_instrumentation_xray_client(self):
xray_client = self._make_client("xray")
token = attach(set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True))
try:
with suppress_http_instrumentation():
xray_client.put_trace_segments(TraceSegmentDocuments=["str1"])
xray_client.put_trace_segments(TraceSegmentDocuments=["str2"])
finally:
detach(token)
self.assertEqual(2, len(self.get_finished_spans()))

@mock_s3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import grpc
from grpc.aio import ClientCallDetails

from opentelemetry import context
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
_carrier_setter,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
Expand Down Expand Up @@ -139,9 +138,10 @@ async def _wrap_stream_response(self, span, call):
span.end()

def tracing_skipped(self, client_call_details):
return context.get_value(
_SUPPRESS_INSTRUMENTATION_KEY
) or not self.rpc_matches_filters(client_call_details)
return (
not is_instrumentation_enabled()
or not self.rpc_matches_filters(client_call_details)
)

def rpc_matches_filters(self, client_call_details):
return self._filter is None or self._filter(client_call_details)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

import grpc

from opentelemetry import context, trace
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import grpcext
from opentelemetry.instrumentation.grpc._utilities import RpcInfo
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.propagate import inject
from opentelemetry.propagators.textmap import Setter
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -123,7 +123,7 @@ def _trace_result(self, span, rpc_info, result):
return result

def _intercept(self, request, metadata, client_info, invoker):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return invoker(request, metadata)

if not metadata:
Expand Down Expand Up @@ -219,7 +219,7 @@ def _intercept_server_stream(
def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
if not is_instrumentation_enabled():
return invoker(request_or_iterator, metadata)

if self._filter is not None and not self._filter(client_info):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ def run(self, result=None):
import pytest

import opentelemetry.instrumentation.grpc
from opentelemetry import context, trace
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import (
GrpcAioInstrumentorClient,
aio_client_interceptors,
)
from opentelemetry.instrumentation.grpc._aio_client import (
UnaryUnaryAioClientInterceptor,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.mock_textmap import MockTextMapPropagator
Expand Down Expand Up @@ -314,53 +314,33 @@ async def test_client_interceptor_trace_context_propagation(self):
set_global_textmap(previous_propagator)

async def test_unary_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
response = await simple_method(self._stub)
assert response.response_data == "data"

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_unary_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
async for response in server_streaming_method(self._stub):
self.assertEqual(response.response_data, "data")

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_stream_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
response = await client_streaming_method(self._stub)
assert response.response_data == "data"

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)

async def test_stream_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
with suppress_instrumentation():
async for response in bidirectional_streaming_method(self._stub):
self.assertEqual(response.response_data, "data")

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
finally:
context.detach(token)
Loading
Loading