Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,39 @@ import asyncio
from types import FrameType
from typing import Optional, Sequence, Union

def register_thread(id: int, native_id: int, name: str) -> None: ... # noqa: A002
from ddtrace._trace import context
from ddtrace._trace import span as ddspan

# Core stack v2 functions
def start(min_interval: float = ...) -> bool: ...
def stop() -> None: ...

# Sampling configuration
def set_adaptive_sampling(do_adaptive_sampling: bool = False) -> None: ...
def set_interval(new_interval: float) -> None: ...

# span <-> profile association
def link_span(span: Optional[Union[context.Context, ddspan.Span]]) -> None: ...

# Thread management
def register_thread(python_thread_id: int, native_id: int, name: str) -> None: ...
def unregister_thread(name: str) -> None: ...

# Asyncio support
def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...
def link_tasks(parent: asyncio.AbstractEventLoop, child: asyncio.Task) -> None: ...
def init_asyncio(
current_tasks: Sequence[asyncio.Task],
scheduled_tasks: Sequence[asyncio.Task],
eager_tasks: Optional[Sequence[asyncio.Task]],
) -> None: ...

# Greenlet support
def track_greenlet(greenlet_id: int, name: str, frame: Union[FrameType, bool, None]) -> None: ...
def untrack_greenlet(greenlet_id: int) -> None: ...
def link_greenlets(greenlet_id: int, parent_id: int) -> None: ...
def update_greenlet_frame(greenlet_id: int, frame: Union[FrameType, bool, None]) -> None: ...

# Module attributes
is_available: bool
failure_msg: str
15 changes: 0 additions & 15 deletions ddtrace/profiling/collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- encoding: utf-8 -*-
import typing

from ddtrace.internal import periodic
from ddtrace.internal import service
from ddtrace.internal.settings.profiling import config

Expand All @@ -25,20 +24,6 @@ def snapshot() -> None:
"""Take a snapshot of collected data, to be exported."""


class PeriodicCollector(Collector, periodic.PeriodicService):
"""A collector that needs to run periodically."""

__slots__ = ()

def periodic(self) -> None:
# This is to simply override periodic.PeriodicService.periodic()
self.collect()

def collect(self) -> None:
"""Collect the actual data."""
raise NotImplementedError


class CaptureSampler(object):
"""Determine the events that should be captured based on a sampling percentage."""

Expand Down
1 change: 0 additions & 1 deletion ddtrace/profiling/collector/_task.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ import typing
def get_task(
thread_id: int,
) -> typing.Tuple[typing.Optional[int], typing.Optional[str], typing.Optional[types.FrameType]]: ...
def list_tasks() -> typing.List[typing.Tuple[int, str, types.FrameType]]: ...
40 changes: 0 additions & 40 deletions ddtrace/profiling/collector/_task.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -108,43 +108,3 @@ cpdef get_task(thread_id):
frame = _gevent_tracer.active_greenlet.gr_frame

return task_id, task_name, frame


cpdef list_tasks(thread_id):
# type: (...) -> typing.List[typing.Tuple[int, str, types.FrameType]]
"""Return the list of running tasks.

This is computed for gevent by taking the list of existing threading.Thread object and removing if any real OS
thread that might be running.

:return: [(task_id, task_name, task_frame), ...]"""

tasks = []

if not is_stack_v2 and _gevent_tracer is not None:
if type(_threading.get_thread_by_id(thread_id)).__name__.endswith("_MainThread"):
# Under normal circumstances, the Hub is running in the main thread.
# Python will only ever have a single instance of a _MainThread
# class, so if we find it we attribute all the greenlets to it.
tasks.extend(
[
(
greenlet_id,
_threading.get_thread_name(greenlet_id),
greenlet.gr_frame
)
for greenlet_id, greenlet in dict(_gevent_tracer.greenlets).items()
if not greenlet.dead
]
)

loop = _asyncio.get_event_loop_for_thread(thread_id)
if loop is not None:
tasks.extend([
(id(task),
_asyncio._task_get_name(task),
_asyncio_task_get_frame(task))
for task in _asyncio.all_tasks(loop)
])

return tasks
65 changes: 65 additions & 0 deletions ddtrace/profiling/collector/stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Simple wrapper around stack_v2 native extension module."""

import logging
import typing

from ddtrace.internal import core
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.internal.settings.profiling import config
from ddtrace.profiling import collector
from ddtrace.profiling.collector import threading
from ddtrace.trace import Tracer


LOG = logging.getLogger(__name__)


class StackCollector(collector.Collector):
"""Execution stacks collector."""

__slots__ = (
"nframes",
"tracer",
)

def __init__(self, nframes: typing.Optional[int] = None, tracer: typing.Optional[Tracer] = None):
super().__init__()

self.nframes = nframes if nframes is not None else config.max_frames
self.tracer = tracer

def __repr__(self) -> str:
class_name = self.__class__.__name__
attrs = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
attrs_str = ", ".join(f"{k}={v!r}" for k, v in attrs.items())

slot_attrs = {slot: getattr(self, slot) for slot in self.__slots__ if not slot.startswith("_")}
slot_attrs_str = ", ".join(f"{k}={v!r}" for k, v in slot_attrs.items())

return f"{class_name}({attrs_str}, {slot_attrs_str})"

def _init(self) -> None:
if self.tracer is not None:
core.on("ddtrace.context_provider.activate", stack_v2.link_span)

# stack v2 requires us to patch the Threading module. It's possible to do this from the stack v2 code
# itself, but it's a little bit fiddly and it's easier to make it correct here.
# TODO take the `threading` import out of here and just handle it in v2 startup
threading.init_stack_v2()
stack_v2.set_adaptive_sampling(config.stack.v2_adaptive_sampling)
stack_v2.start()

def _start_service(self) -> None:
# This is split in its own function to ease testing
LOG.debug("Profiling StackCollector starting")
self._init()
LOG.debug("Profiling StackCollector started")

def _stop_service(self) -> None:
LOG.debug("Profiling StackCollector stopping")
if self.tracer is not None:
core.reset_listeners("ddtrace.context_provider.activate", stack_v2.link_span)
LOG.debug("Profiling StackCollector stopped")

# Tell the native thread running the v2 sampler to stop
stack_v2.stop()
11 changes: 0 additions & 11 deletions ddtrace/profiling/collector/stack.pyi

This file was deleted.

Loading
Loading