Skip to content

Commit 162a670

Browse files
committed
feat(profiling): gevent support
We add support for gevent in stack-v2.
1 parent 5c7228a commit 162a670

File tree

7 files changed

+291
-75
lines changed

7 files changed

+291
-75
lines changed

ddtrace/internal/datadog/profiling/stack_v2/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ endif()
4141

4242
# Add echion
4343
set(ECHION_COMMIT
44-
"6ebe7dddb604aa97e89f072c0fc65c9785e023a0" # https://github.com/P403n1x87/echion/commit/6ebe7dddb604aa97e89f072c0fc65c9785e023a0
44+
"dcd2a27a219cb2e91e799779e04ba7276ba13a27" # https://github.com/P403n1x87/echion/commit/dcd2a27a219cb2e91e799779e04ba7276ba13a27
4545
CACHE STRING "Commit hash of echion to use")
4646
FetchContent_Declare(
4747
echion

ddtrace/internal/datadog/profiling/stack_v2/include/sampler.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class Sampler
5151
PyObject* _asyncio_eager_tasks);
5252
void link_tasks(PyObject* parent, PyObject* child);
5353
void sampling_thread(const uint64_t seq_num);
54+
void track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame_cell);
55+
void untrack_greenlet(uintptr_t greenlet_id);
56+
void link_greenlets(uintptr_t parent, uintptr_t child);
5457

5558
// The Python side dynamically adjusts the sampling rate based on overhead, so we need to be able to update our
5659
// own intervals accordingly. Rather than a preemptive measure, we assume the rate is ~fairly stable and just

ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "thread_span_links.hpp"
44

5+
#include "echion/greenlets.h"
56
#include "echion/interp.h"
67
#include "echion/tasks.h"
78
#include "echion/threads.h"
@@ -338,3 +339,38 @@ Sampler::link_tasks(PyObject* parent, PyObject* child)
338339
std::lock_guard<std::mutex> guard(task_link_map_lock);
339340
task_link_map[child] = parent;
340341
}
342+
343+
void
344+
Sampler::track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame_cell)
345+
{
346+
const std::lock_guard<std::mutex> guard(greenlet_info_map_lock);
347+
348+
auto entry = greenlet_info_map.find(greenlet_id);
349+
if (entry != greenlet_info_map.end())
350+
// Greenlet is already tracked so we update its info
351+
entry->second = std::make_unique<GreenletInfo>(greenlet_id, frame_cell, name);
352+
else
353+
greenlet_info_map.emplace(greenlet_id, std::make_unique<GreenletInfo>(greenlet_id, frame_cell, name));
354+
355+
// Update the thread map
356+
auto native_id = PyThread_get_thread_native_id();
357+
greenlet_thread_map[native_id] = greenlet_id;
358+
}
359+
360+
void
361+
Sampler::untrack_greenlet(uintptr_t greenlet_id)
362+
{
363+
const std::lock_guard<std::mutex> guard(greenlet_info_map_lock);
364+
365+
greenlet_info_map.erase(greenlet_id);
366+
greenlet_parent_map.erase(greenlet_id);
367+
greenlet_thread_map.erase(greenlet_id);
368+
}
369+
370+
void
371+
Sampler::link_greenlets(uintptr_t parent, uintptr_t child)
372+
{
373+
std::lock_guard<std::mutex> guard(greenlet_info_map_lock);
374+
375+
greenlet_parent_map[child] = parent;
376+
}

ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,56 @@ stack_v2_set_adaptive_sampling(PyObject* Py_UNUSED(self), PyObject* args)
191191
Py_RETURN_NONE;
192192
}
193193

194+
static PyObject*
195+
track_greenlet(PyObject* Py_UNUSED(m), PyObject* args)
196+
{
197+
uintptr_t greenlet_id; // map key
198+
PyObject* name;
199+
PyObject* frame_cell;
200+
201+
if (!PyArg_ParseTuple(args, "lOO", &greenlet_id, &name, &frame_cell))
202+
return NULL;
203+
204+
StringTable::Key greenlet_name;
205+
206+
try {
207+
greenlet_name = string_table.key(name);
208+
} catch (StringTable::Error&) {
209+
// We failed to get this task but we keep going
210+
PyErr_SetString(PyExc_RuntimeError, "Failed to get greenlet name from the string table");
211+
return NULL;
212+
}
213+
214+
Sampler::get().track_greenlet(greenlet_id, greenlet_name, frame_cell);
215+
216+
Py_RETURN_NONE;
217+
}
218+
219+
static PyObject*
220+
untrack_greenlet(PyObject* Py_UNUSED(m), PyObject* args)
221+
{
222+
uintptr_t greenlet_id;
223+
if (!PyArg_ParseTuple(args, "l", &greenlet_id))
224+
return NULL;
225+
226+
Sampler::get().untrack_greenlet(greenlet_id);
227+
228+
Py_RETURN_NONE;
229+
}
230+
231+
static PyObject*
232+
link_greenlets(PyObject* Py_UNUSED(m), PyObject* args)
233+
{
234+
uintptr_t parent, child;
235+
236+
if (!PyArg_ParseTuple(args, "ll", &child, &parent))
237+
return NULL;
238+
239+
Sampler::get().link_greenlets(parent, child);
240+
241+
Py_RETURN_NONE;
242+
}
243+
194244
static PyMethodDef _stack_v2_methods[] = {
195245
{ "start", reinterpret_cast<PyCFunction>(stack_v2_start), METH_VARARGS | METH_KEYWORDS, "Start the sampler" },
196246
{ "stop", stack_v2_stop, METH_VARARGS, "Stop the sampler" },
@@ -205,6 +255,11 @@ static PyMethodDef _stack_v2_methods[] = {
205255
{ "track_asyncio_loop", stack_v2_track_asyncio_loop, METH_VARARGS, "Map the name of a task with its identifier" },
206256
{ "init_asyncio", stack_v2_init_asyncio, METH_VARARGS, "Initialise asyncio tracking" },
207257
{ "link_tasks", stack_v2_link_tasks, METH_VARARGS, "Link two tasks" },
258+
// greenlet support
259+
{ "track_greenlet", track_greenlet, METH_VARARGS, "Map a greenlet with its identifier" },
260+
{ "untrack_greenlet", untrack_greenlet, METH_VARARGS, "Untrack a terminated greenlet" },
261+
{ "link_greenlets", link_greenlets, METH_VARARGS, "Link two greenlets" },
262+
208263
{ "set_adaptive_sampling", stack_v2_set_adaptive_sampling, METH_VARARGS, "Set adaptive sampling" },
209264
{ NULL, NULL, 0, NULL }
210265
};

ddtrace/profiling/_gevent.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
from types import FrameType
2+
import typing as t
3+
4+
import gevent
5+
from gevent import thread
6+
import gevent.greenlet
7+
from gevent.greenlet import Greenlet as _Greenlet
8+
import gevent.hub
9+
from greenlet import greenlet
10+
from greenlet import settrace
11+
12+
from ddtrace.internal.datadog.profiling import stack_v2
13+
14+
15+
# Original objects
16+
_gevent_hub_spawn_raw = gevent.hub.spawn_raw
17+
_gevent_joinall = gevent.joinall
18+
_gevent_wait = gevent.wait
19+
_gevent_iwait = gevent.iwait
20+
21+
# Global package state
22+
_greenlet_frame_cells: t.Dict[int, list] = {}
23+
_original_greenlet_tracer: t.Optional[t.Callable[[str, t.Any], None]] = None
24+
_greenlet_parent_map: t.Dict[int, int] = {}
25+
_parent_greenlet_count: t.Dict[int, int] = {}
26+
27+
FRAME_NOT_SET = False # Sentinel for when the frame is not set
28+
29+
30+
def track_gevent_greenlet(greenlet: _Greenlet) -> _Greenlet:
31+
greenlet_id = thread.get_ident(greenlet)
32+
frame_cell: t.List[t.Union[FrameType, bool, None]] = [FRAME_NOT_SET]
33+
34+
stack_v2.track_greenlet( # type: ignore[attr-defined]
35+
greenlet_id,
36+
greenlet.name or type(greenlet).__qualname__,
37+
frame_cell,
38+
)
39+
40+
# Untrack on completion
41+
try:
42+
greenlet.rawlink(untrack_greenlet)
43+
except AttributeError:
44+
# This greenlet cannot be linked (e.g. the Hub)
45+
pass
46+
47+
_greenlet_frame_cells[greenlet_id] = frame_cell
48+
49+
return greenlet
50+
51+
52+
def greenlet_tracer(event: str, args: t.Any) -> None:
53+
if event in {"switch", "throw"}:
54+
# This tracer function runs in the context of the target
55+
origin, target = t.cast(tuple, args)
56+
57+
if (origin_id := thread.get_ident(origin)) not in _greenlet_frame_cells:
58+
try:
59+
track_gevent_greenlet(origin)
60+
except Exception:
61+
# Not something that we can track
62+
pass
63+
64+
if (target_id := thread.get_ident(target)) not in _greenlet_frame_cells:
65+
# This is likely the hub. We take this chance to track it.
66+
try:
67+
track_gevent_greenlet(target)
68+
except Exception:
69+
# Not something that we can track
70+
pass
71+
72+
try:
73+
# If this is being set to None, it means the greenlet is likely
74+
# finished. We use the sentinel again to signal this.
75+
_greenlet_frame_cells[origin_id][0] = origin.gr_frame or FRAME_NOT_SET
76+
if target_id not in _parent_greenlet_count:
77+
# We don't want to wipe the frame of a parent greenlet because
78+
# we need to unwind it. We definitely know it is still running
79+
# so if we allow the tracer to set its tracked frame to None,
80+
# we won't be able to unwind the full stack.
81+
_greenlet_frame_cells[target_id][0] = target.gr_frame # this *is* None
82+
except KeyError:
83+
# TODO: Log missing greenlet
84+
pass
85+
86+
if _original_greenlet_tracer is not None:
87+
_original_greenlet_tracer(event, args)
88+
89+
90+
def untrack_greenlet(greenlet: _Greenlet) -> None:
91+
greenlet_id = thread.get_ident(greenlet)
92+
stack_v2.untrack_greenlet(greenlet_id) # type: ignore[attr-defined]
93+
_greenlet_frame_cells.pop(greenlet_id, None)
94+
_parent_greenlet_count.pop(greenlet_id, None)
95+
if (parent_id := _greenlet_parent_map.pop(greenlet_id, None)) is not None:
96+
_parent_greenlet_count[parent_id] -= 1
97+
if _parent_greenlet_count[parent_id] <= 0:
98+
del _parent_greenlet_count[parent_id]
99+
100+
101+
def link_greenlets(greenlet_id: int, parent_id: int) -> None:
102+
stack_v2.link_greenlets(greenlet_id, parent_id) # type: ignore[attr-defined]
103+
_parent_greenlet_count[parent_id] = _parent_greenlet_count.get(parent_id, 0) + 1
104+
_greenlet_parent_map[greenlet_id] = parent_id
105+
106+
107+
class Greenlet(_Greenlet):
108+
@classmethod
109+
def spawn(cls, *args: t.Any, **kwargs: t.Any) -> _Greenlet:
110+
return track_gevent_greenlet(super().spawn(*args, **kwargs))
111+
112+
@classmethod
113+
def spawn_later(cls, *args: t.Any, **kwargs: t.Any) -> _Greenlet:
114+
return track_gevent_greenlet(super().spawn_later(*args, **kwargs))
115+
116+
def join(self, *args: t.Any, **kwargs: t.Any) -> None:
117+
target_id = thread.get_ident(self)
118+
origin_id = thread.get_ident(gevent.getcurrent())
119+
120+
link_greenlets(target_id, origin_id)
121+
122+
super().join(*args, **kwargs)
123+
124+
125+
def wrap_spawn(original: t.Callable[..., _Greenlet]) -> t.Callable[..., _Greenlet]:
126+
def _(*args: t.Any, **kwargs: t.Any) -> _Greenlet:
127+
return track_gevent_greenlet(original(*args, **kwargs))
128+
129+
return _
130+
131+
132+
def joinall(greenlets: t.Iterable[_Greenlet], *args: t.Any, **kwargs: t.Any) -> None:
133+
# This is a wrapper around gevent.joinall to track the greenlets
134+
# that are being joined.
135+
current_greenlet = gevent.getcurrent()
136+
if isinstance(current_greenlet, greenlet):
137+
current_greenlet = gevent.hub.get_hub()
138+
current_greenlet_id = thread.get_ident(current_greenlet)
139+
for g in greenlets:
140+
link_greenlets(thread.get_ident(g), current_greenlet_id)
141+
_gevent_joinall(greenlets, *args, **kwargs)
142+
143+
144+
def wait_wrapper(original: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]:
145+
def _(*args: t.Any, **kwargs: t.Any) -> t.Any:
146+
try:
147+
objects = args[0]
148+
except IndexError:
149+
objects = kwargs.get("args", [])
150+
151+
if greenlets := [_ for _ in objects if isinstance(_, (greenlet, gevent.Greenlet))]:
152+
current_greenlet = gevent.getcurrent()
153+
if isinstance(current_greenlet, greenlet):
154+
current_greenlet = gevent.hub.get_hub()
155+
current_greenlet_id = thread.get_ident(current_greenlet)
156+
for g in greenlets:
157+
link_greenlets(thread.get_ident(g), current_greenlet_id)
158+
159+
return original(*args, **kwargs)
160+
161+
return _
162+
163+
164+
def patch() -> None:
165+
global _original_greenlet_tracer
166+
167+
# Patch the spawn method to track greenlets.
168+
gevent.Greenlet = gevent.greenlet.Greenlet = Greenlet
169+
gevent.spawn = Greenlet.spawn
170+
gevent.spawn_later = Greenlet.spawn_later
171+
gevent.joinall = joinall
172+
gevent.wait = wait_wrapper(_gevent_wait)
173+
gevent.iwait = wait_wrapper(_gevent_iwait)
174+
175+
gevent.hub.spawn_raw = wrap_spawn(_gevent_hub_spawn_raw)
176+
177+
_original_greenlet_tracer = settrace(greenlet_tracer)
178+
179+
180+
def unpatch() -> None:
181+
# Unpatch the spawn method to stop tracking greenlets.
182+
gevent.Greenlet = gevent.greenlet.Greenlet = _Greenlet
183+
gevent.spawn = _Greenlet.spawn
184+
gevent.spawn_later = _Greenlet.spawn_later
185+
gevent.joinall = _gevent_joinall
186+
gevent.wait = _gevent_wait
187+
gevent.iwait = _gevent_iwait
188+
189+
gevent.hub.spawn_raw = _gevent_hub_spawn_raw
190+
191+
settrace(_original_greenlet_tracer)

0 commit comments

Comments
 (0)