From 493ef566b0153b5515d6a16ddda6dd4236c82132 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sat, 14 Sep 2024 16:50:16 +0500 Subject: [PATCH] check iscoroutinefunction --- .../src/opentelemetry/instrumentation/aiokafka/__init__.py | 7 +++++++ .../src/opentelemetry/instrumentation/aiokafka/utils.py | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 627003aea5..5b2b0cd0e8 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -67,6 +67,7 @@ async def async_consume_hook(span, record, args, kwargs): API ___ """ +from asyncio import iscoroutinefunction from typing import Collection import aiokafka @@ -102,8 +103,14 @@ def _instrument(self, **kwargs): ``async_consume_hook``: a callable to be executed just after consuming a message """ tracer_provider = kwargs.get("tracer_provider") + async_produce_hook = kwargs.get("async_produce_hook") + if not iscoroutinefunction(async_produce_hook): + async_produce_hook = None + async_consume_hook = kwargs.get("async_consume_hook") + if not iscoroutinefunction(async_consume_hook): + async_consume_hook = None tracer = trace.get_tracer( __name__, diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 0c8438b740..3c54ce1500 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -288,7 +288,7 @@ async def _traced_send( setter=_aiokafka_setter, ) try: - if callable(async_produce_hook): + if async_produce_hook is not None: await async_produce_hook(span, args, kwargs) except Exception as hook_exception: # pylint: disable=W0703 _LOG.exception(hook_exception) @@ -328,7 +328,7 @@ async def _create_consumer_span( offset=record.offset, ) try: - if callable(async_consume_hook): + if async_consume_hook is not None: await async_consume_hook(span, record, args, kwargs) except Exception as hook_exception: # pylint: disable=W0703 _LOG.exception(hook_exception)