Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@

import threading
from concurrent import futures
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Callable, Collection

from opentelemetry.metrics import get_meter
from wrapt import (
wrap_function_wrapper, # type: ignore[reportUnknownVariableType]
)

from opentelemetry import context
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.threading.package import _instruments
from opentelemetry.instrumentation.threading.version import __version__
from opentelemetry.instrumentation.utils import unwrap

if TYPE_CHECKING:
Expand All @@ -63,11 +66,40 @@ class ThreadingInstrumentor(BaseInstrumentor):
__WRAPPER_START_METHOD = "start"
__WRAPPER_RUN_METHOD = "run"
__WRAPPER_SUBMIT_METHOD = "submit"
__WRAPPER_INIT_METHOD = "__init__"

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs: Any):
meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.38.0",
)

self.working_items_count = meter.create_up_down_counter(
name="python.threadpool.working_items.count",
unit="threads",
description="The number of jobs currently being processed by the thread pool",
)
self.queue_count = meter.create_up_down_counter(
name="python.threadpool.queue.length",
unit="threads",
description="The number of jobs currently queued in the thread pool",
)
self.thread_count = meter.create_gauge(
name="python.threadpool.thread.count",
unit="threads",
description="The maximum number of concurrent jobs allowed in the thread pool",
)
self.max_thread_count = meter.create_gauge(
name="python.threadpool.thread.max_count",
unit="threads",
description="The maximum number of concurrent jobs allowed in the thread pool",
)
self._instrument_thread()
self._instrument_timer()
self._instrument_thread_pool()
Expand Down Expand Up @@ -103,12 +135,16 @@ def _instrument_timer():
ThreadingInstrumentor.__wrap_threading_run,
)

@staticmethod
def _instrument_thread_pool():
def _instrument_thread_pool(self):
wrap_function_wrapper(
futures.ThreadPoolExecutor,
ThreadingInstrumentor.__WRAPPER_INIT_METHOD,
self.__build_wrap_thread_pool_init(),
)
wrap_function_wrapper(
futures.ThreadPoolExecutor,
ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD,
ThreadingInstrumentor.__wrap_thread_pool_submit,
self.__build_wrap_thread_pool_submit(),
)

@staticmethod
Expand All @@ -123,6 +159,10 @@ def _uninstrument_timer():

@staticmethod
def _uninstrument_thread_pool():
unwrap(
futures.ThreadPoolExecutor,
ThreadingInstrumentor.__WRAPPER_INIT_METHOD,
)
unwrap(
futures.ThreadPoolExecutor,
ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD,
Expand Down Expand Up @@ -153,26 +193,62 @@ def __wrap_threading_run(
if token is not None:
context.detach(token)

@staticmethod
def __wrap_thread_pool_submit(
call_wrapped: Callable[..., R],
instance: futures.ThreadPoolExecutor,
args: tuple[Callable[..., Any], ...],
kwargs: dict[str, Any],
) -> R:
# obtain the original function and wrapped kwargs
original_func = args[0]
otel_context = context.get_current()
def __build_wrap_thread_pool_submit(self) -> Callable[..., Future[R]]:
def __wrap_thread_pool_submit(
call_wrapped: Callable[..., Future[R]],
instance: futures.ThreadPoolExecutor,
args: tuple[Callable[..., Any], ...],
kwargs: dict[str, Any],
) -> Future[R]:
# obtain the original function and wrapped kwargs
original_func = args[0]
otel_context = context.get_current()
attributes = {
"threadpool.executor": instance._thread_name_prefix,
}

def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R:
token = None
try:
token = context.attach(otel_context)
self.queue_count.add(-1, attributes)
self.working_items_count.add(1, attributes)
return original_func(*func_args, **func_kwargs)
finally:
if token is not None:
context.detach(token)

# replace the original function with the wrapped function
new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[
1:
]
self.queue_count.add(1, attributes)

def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R:
token = None
try:
token = context.attach(otel_context)
return original_func(*func_args, **func_kwargs)
finally:
if token is not None:
context.detach(token)

# replace the original function with the wrapped function
new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[1:]
return call_wrapped(*new_args, **kwargs)
future = call_wrapped(*new_args, **kwargs)
except RuntimeError:
self.queue_count.add(-1, attributes)
raise

self.thread_count.set(len(instance._threads), attributes)
future.add_done_callback(
lambda _: self.working_items_count.add(-1, attributes)
)
return future

return __wrap_thread_pool_submit

def __build_wrap_thread_pool_init(self) -> Callable[..., None]:
def __wrap_thread_pool_init(
call_wrapped: Callable[..., None],
instance: futures.ThreadPoolExecutor,
args: tuple[Callable[..., Any], ...],
kwargs: dict[str, Any],
) -> None:
call_wrapped(*args, **kwargs)
attributes = {
"threadpool.executor": instance._thread_name_prefix,
}
self.max_thread_count.set(instance._max_workers, attributes)

return __wrap_thread_pool_init
Loading