Skip to content

Commit

Permalink
check iscoroutinefunction
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Sep 14, 2024
1 parent e7ae511 commit 493ef56
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def async_consume_hook(span, record, args, kwargs):
API
___
"""
from asyncio import iscoroutinefunction
from typing import Collection

import aiokafka
Expand Down Expand Up @@ -102,8 +103,14 @@ def _instrument(self, **kwargs):
``async_consume_hook``: a callable to be executed just after consuming a message
"""
tracer_provider = kwargs.get("tracer_provider")

async_produce_hook = kwargs.get("async_produce_hook")
if not iscoroutinefunction(async_produce_hook):
async_produce_hook = None

async_consume_hook = kwargs.get("async_consume_hook")
if not iscoroutinefunction(async_consume_hook):
async_consume_hook = None

tracer = trace.get_tracer(
__name__,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ async def _traced_send(
setter=_aiokafka_setter,
)
try:
if callable(async_produce_hook):
if async_produce_hook is not None:
await async_produce_hook(span, args, kwargs)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
Expand Down Expand Up @@ -328,7 +328,7 @@ async def _create_consumer_span(
offset=record.offset,
)
try:
if callable(async_consume_hook):
if async_consume_hook is not None:
await async_consume_hook(span, record, args, kwargs)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
Expand Down

0 comments on commit 493ef56

Please sign in to comment.