Skip to content

Commit 1365866

Browse files
author
Julien Danjou
authored
refactor(profiling): move _ThreadLink in ddtrace.profiling.collector._threading (DataDog#3087)
1 parent e269191 commit 1365866

File tree

2 files changed

+70
-67
lines changed

2 files changed

+70
-67
lines changed

ddtrace/profiling/collector/_threading.pyx

+68
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from __future__ import absolute_import
22

33
import threading
4+
import typing
5+
import weakref
6+
7+
import attr
48

59
from ddtrace.internal import nogevent
610

@@ -43,3 +47,67 @@ cpdef get_thread_native_id(thread_id):
4347
except AttributeError:
4448
# Python < 3.8
4549
return hash(thread_obj)
50+
51+
52+
# cython does not play well with mypy
53+
if typing.TYPE_CHECKING:
54+
_T = typing.TypeVar("_T")
55+
_thread_link_base = typing.Generic[_T]
56+
_weakref_type = weakref.ReferenceType[_T]
57+
else:
58+
_thread_link_base = object
59+
_weakref_type = typing.Any
60+
61+
62+
@attr.s(slots=True, eq=False)
63+
class _ThreadLink(_thread_link_base):
64+
"""Link a thread with an object.
65+
66+
Object is removed when the thread disappears.
67+
"""
68+
69+
# Key is a thread_id
70+
# Value is a weakref to an object
71+
_thread_id_to_object = attr.ib(factory=dict, repr=False, init=False, type=typing.Dict[int, _weakref_type])
72+
_lock = attr.ib(factory=nogevent.Lock, repr=False, init=False, type=nogevent.Lock)
73+
74+
def link_object(
75+
self,
76+
obj # type: _T
77+
):
78+
# type: (...) -> None
79+
"""Link an object to the current running thread."""
80+
# Since we're going to iterate over the set, make sure it's locked
81+
with self._lock:
82+
self._thread_id_to_object[nogevent.thread_get_ident()] = weakref.ref(obj)
83+
84+
def clear_threads(self,
85+
existing_thread_ids, # type: typing.Set[int]
86+
):
87+
"""Clear the stored list of threads based on the list of existing thread ids.
88+
89+
If any thread that is part of this list was stored, its data will be deleted.
90+
91+
:param existing_thread_ids: A set of thread ids to keep.
92+
"""
93+
with self._lock:
94+
# Iterate over a copy of the list of keys since it's mutated during our iteration.
95+
for thread_id in list(self._thread_id_to_object.keys()):
96+
if thread_id not in existing_thread_ids:
97+
del self._thread_id_to_object[thread_id]
98+
99+
def get_object(
100+
self,
101+
thread_id # type: int
102+
):
103+
# type: (...) -> _T
104+
"""Return the object attached to thread.
105+
106+
:param thread_id: The thread id.
107+
:return: The attached object.
108+
"""
109+
110+
with self._lock:
111+
obj_ref = self._thread_id_to_object.get(thread_id)
112+
if obj_ref is not None:
113+
return obj_ref()

ddtrace/profiling/collector/stack.pyx

+2-67
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ from __future__ import absolute_import
44
import sys
55
import threading
66
import typing
7-
import weakref
87

98
import attr
109
import six
@@ -403,74 +402,10 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
403402
return stack_events, exc_events
404403

405404

406-
# cython does not play well with mypy
407405
if typing.TYPE_CHECKING:
408-
_T = typing.TypeVar("_T")
409-
_thread_link_base = typing.Generic[_T]
410-
_weakref_type = weakref.ReferenceType[_T]
406+
_thread_span_links_base = _threading._ThreadLink[ddspan.Span]
411407
else:
412-
_thread_link_base = object
413-
_weakref_type = typing.Any
414-
415-
416-
@attr.s(slots=True, eq=False)
417-
class _ThreadLink(_thread_link_base):
418-
"""Link a thread with an object.
419-
420-
Object is removed when the thread disappears.
421-
"""
422-
423-
# Key is a thread_id
424-
# Value is a weakref to an object
425-
_thread_id_to_object = attr.ib(factory=dict, repr=False, init=False, type=typing.Dict[int, _weakref_type])
426-
_lock = attr.ib(factory=nogevent.Lock, repr=False, init=False, type=nogevent.Lock)
427-
428-
def link_object(
429-
self,
430-
obj # type: _T
431-
):
432-
# type: (...) -> None
433-
"""Link an object to the current running thread."""
434-
# Since we're going to iterate over the set, make sure it's locked
435-
with self._lock:
436-
self._thread_id_to_object[nogevent.thread_get_ident()] = weakref.ref(obj)
437-
438-
def clear_threads(self,
439-
existing_thread_ids, # type: typing.Set[int]
440-
):
441-
"""Clear the stored list of threads based on the list of existing thread ids.
442-
443-
If any thread that is part of this list was stored, its data will be deleted.
444-
445-
:param existing_thread_ids: A set of thread ids to keep.
446-
"""
447-
with self._lock:
448-
# Iterate over a copy of the list of keys since it's mutated during our iteration.
449-
for thread_id in list(self._thread_id_to_object.keys()):
450-
if thread_id not in existing_thread_ids:
451-
del self._thread_id_to_object[thread_id]
452-
453-
def get_object(
454-
self,
455-
thread_id # type: int
456-
):
457-
# type: (...) -> _T
458-
"""Return the object attached to thread.
459-
460-
:param thread_id: The thread id.
461-
:return: The attached object.
462-
"""
463-
464-
with self._lock:
465-
obj_ref = self._thread_id_to_object.get(thread_id)
466-
if obj_ref is not None:
467-
return obj_ref()
468-
469-
470-
if typing.TYPE_CHECKING:
471-
_thread_span_links_base = _ThreadLink[ddspan.Span]
472-
else:
473-
_thread_span_links_base = _ThreadLink
408+
_thread_span_links_base = _threading._ThreadLink
474409

475410

476411
@attr.s(slots=True, eq=False)

0 commit comments

Comments
 (0)