Skip to content

Commit

Permalink
fix(profiling): stack v2 tracks threads in gunicorn workers [backport…
Browse files Browse the repository at this point in the history
… 2.16] (#11314)

Backport 0923b51 from #11300 to 2.16.

See the following screenshot for cpu profile before and after this
change for a service using gunicorn.

<img width="1754" alt="Screenshot 2024-11-06 at 1 55 56 PM"
src="https://github.com/user-attachments/assets/e29d8b56-c789-416e-907e-4726aab33b2b">

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

Co-authored-by: Taegyun Kim <taegyun.kim@datadoghq.com>
  • Loading branch information
github-actions[bot] and taegyunkim authored Nov 6, 2024
1 parent 055d352 commit c3899f7
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 45 deletions.
8 changes: 7 additions & 1 deletion ddtrace/profiling/collector/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ def thread_bootstrap_inner(self, *args, **kwargs):

# Instrument any living threads
for thread_id, thread in threading._active.items():
stack_v2.register_thread(thread.ident, thread.native_id, thread.name)
# DEV: calling _set_native_id will register the thread with stack_v2
# as we've already patched it.
# Calling _set_native_id was necessary to ensure that the native_id
# was set on the thread running in gunicorn workers. They need to be
# updated with correct native_id so that the thread can be tracked
# correctly in the echion stack_v2.
thread._set_native_id()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
profiling: fixes an issue where cpu-time was not profiled for services using
gunicorn, when ``DD_PROFILING_STACK_V2_ENABLED` was set.
66 changes: 37 additions & 29 deletions tests/profiling/collector/pprof_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def assert_lock_events(
assert_lock_events_of_type(profile, expected_release_events, LockEventType.RELEASE)


def assert_str_label(string_table, sample, key: str, expected_value: Optional[str]):
def assert_str_label(string_table: Dict[int, str], sample, key: str, expected_value: Optional[str]):
if expected_value:
label = get_label_with_key(string_table, sample, key)
# We use fullmatch to ensure that the whole string matches the expected value
Expand All @@ -203,25 +203,25 @@ def assert_str_label(string_table, sample, key: str, expected_value: Optional[st
)


def assert_num_label(string_table, sample, key: str, expected_value: Optional[int]):
def assert_num_label(string_table: Dict[int, str], sample, key: str, expected_value: Optional[int]):
if expected_value:
label = get_label_with_key(string_table, sample, key)
assert label.num == expected_value, "Expected {} got {} for label {}".format(expected_value, label.num, key)


def assert_base_event(profile, sample: pprof_pb2.Sample, expected_event: EventBaseClass):
assert_num_label(profile.string_table, sample, "span id", expected_event.span_id)
assert_num_label(profile.string_table, sample, "local root span id", expected_event.local_root_span_id)
assert_str_label(profile.string_table, sample, "trace type", expected_event.trace_type)
assert_str_label(profile.string_table, sample, "trace endpoint", expected_event.trace_endpoint)
assert_num_label(profile.string_table, sample, "thread id", expected_event.thread_id)
assert_str_label(profile.string_table, sample, "thread name", expected_event.thread_name)
assert_str_label(profile.string_table, sample, "class name", expected_event.class_name)
assert_num_label(profile.string_table, sample, "task id", expected_event.task_id)
assert_str_label(profile.string_table, sample, "task name", expected_event.task_name)
def assert_base_event(string_table: Dict[int, str], sample: pprof_pb2.Sample, expected_event: EventBaseClass):
assert_num_label(string_table, sample, "span id", expected_event.span_id)
assert_num_label(string_table, sample, "local root span id", expected_event.local_root_span_id)
assert_str_label(string_table, sample, "trace type", expected_event.trace_type)
assert_str_label(string_table, sample, "trace endpoint", expected_event.trace_endpoint)
assert_num_label(string_table, sample, "thread id", expected_event.thread_id)
assert_str_label(string_table, sample, "thread name", expected_event.thread_name)
assert_str_label(string_table, sample, "class name", expected_event.class_name)
assert_num_label(string_table, sample, "task id", expected_event.task_id)
assert_str_label(string_table, sample, "task name", expected_event.task_name)


def assert_lock_event(profile, sample: pprof_pb2.Sample, expected_event: LockEvent):
def assert_lock_event(profile: pprof_pb2.Profile, sample: pprof_pb2.Sample, expected_event: LockEvent):
# Check that the sample has label "lock name" with value
# filename:self.lock_linenos.create:lock_name
lock_name_label = get_label_with_key(profile.string_table, sample, "lock name")
Expand Down Expand Up @@ -254,7 +254,7 @@ def assert_lock_event(profile, sample: pprof_pb2.Sample, expected_event: LockEve
expected_event.linenos.release, line.line
)

assert_base_event(profile, sample, expected_event)
assert_base_event(profile.string_table, sample, expected_event)


def assert_sample_has_locations(profile, sample, expected_locations: Optional[List[StackLocation]]):
Expand All @@ -277,34 +277,42 @@ def assert_sample_has_locations(profile, sample, expected_locations: Optional[Li
filename = os.path.basename(profile.string_table[function.filename])
line_no = line.line
sample_loc_strs.append(f"{filename}:{function_name}:{line_no}")
if (
function_name.endswith(expected_locations[expected_locations_idx].function_name)
and filename == expected_locations[expected_locations_idx].filename
and line_no == expected_locations[expected_locations_idx].line_no
):
expected_locations_idx += 1
if expected_locations_idx == len(expected_locations):
checked = True
break

if expected_locations_idx < len(expected_locations):
if (
function_name.endswith(expected_locations[expected_locations_idx].function_name)
and filename == expected_locations[expected_locations_idx].filename
and line_no == expected_locations[expected_locations_idx].line_no
):
expected_locations_idx += 1
if expected_locations_idx == len(expected_locations):
checked = True

for loc in sample_loc_strs:
if DEBUG_TEST:
print(loc)

assert checked, "Expected locations {} not found in sample locations: {}".format(
expected_locations, sample_loc_strs
)


def assert_stack_event(profile, sample: pprof_pb2.Sample, expected_event: StackEvent):
def assert_stack_event(profile: pprof_pb2.Profile, sample: pprof_pb2.Sample, expected_event: StackEvent):
# Check that the sample has label "exception type" with value

assert_str_label(profile.string_table, sample, "exception type", expected_event.exception_type)
assert_sample_has_locations(profile, sample, expected_event.locations)
assert_base_event(profile, sample, expected_event)
assert_base_event(profile.string_table, sample, expected_event)


def assert_has_samples(profile, expected_samples):
def assert_profile_has_sample(
profile: pprof_pb2.Profile,
samples: List[pprof_pb2.Sample],
expected_sample: StackEvent,
):
found = False
for sample in profile.sample:
for sample in samples:
try:
assert_stack_event(profile, sample, expected_samples)
assert_stack_event(profile, sample, expected_sample)
found = True
break
except AssertionError as e:
Expand Down
21 changes: 19 additions & 2 deletions tests/profiling/gunicorn-app.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
def app():
pass
import os
import threading


def fib(n):
if n <= 1:
return n
return fib(n - 1) + fib(n - 2)


def app(environ, start_response):
response_body = "fib(35) is %d at pid %d tid %d" % (fib(35), os.getpid(), threading.get_ident())

response_body = response_body.encode("utf-8")

status = "200 OK" if response_body else "404 Not Found"
headers = [("Content-type", "text/plain")]
start_response(status, headers)
return [response_body]
12 changes: 7 additions & 5 deletions tests/profiling_v2/collector/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def foo():
],
)

pprof_utils.assert_has_samples(profile, expected_sample)
pprof_utils.assert_profile_has_sample(profile, samples=samples, expected_sample=expected_sample)


@pytest.mark.parametrize("stack_v2_enabled", [True, False])
Expand Down Expand Up @@ -471,9 +471,10 @@ def sleep_instance(self):
samples = pprof_utils.get_samples_with_value_type(profile, "wall-time")
assert len(samples) > 0

pprof_utils.assert_has_samples(
pprof_utils.assert_profile_has_sample(
profile,
pprof_utils.StackEvent(
samples=samples,
expected_sample=pprof_utils.StackEvent(
thread_id=_thread.get_ident(),
thread_name="MainThread",
class_name="SomeClass" if not stack_v2_enabled else None,
Expand Down Expand Up @@ -529,9 +530,10 @@ def sleep_instance(foobar, self):
samples = pprof_utils.get_samples_with_value_type(profile, "wall-time")
assert len(samples) > 0

pprof_utils.assert_has_samples(
pprof_utils.assert_profile_has_sample(
profile,
pprof_utils.StackEvent(
samples=samples,
expected_sample=pprof_utils.StackEvent(
thread_id=_thread.get_ident(),
thread_name="MainThread",
# stack v1 relied on using cls and self to figure out class name
Expand Down
58 changes: 50 additions & 8 deletions tests/profiling_v2/test_gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
import subprocess
import sys
import time
import urllib.request

import pytest

from tests.profiling.collector import pprof_utils


# DEV: gunicorn tests are hard to debug, so keeping these print statements for
# future debugging
DEBUG_PRINT = False


def debug_print(*args):
if DEBUG_PRINT:
print(*args)


# gunicorn is not available on Windows
if sys.platform == "win32":
pytestmark = pytest.mark.skip
Expand All @@ -28,8 +39,7 @@ def _run_gunicorn(*args):

@pytest.fixture
def gunicorn(monkeypatch):
# Do not ignore profiler so we have samples in the output pprof
monkeypatch.setenv("DD_PROFILING_IGNORE_PROFILER", "0")
monkeypatch.setenv("DD_PROFILING_IGNORE_PROFILER", "1")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
# This was needed for the gunicorn process to start and print worker startup
# messages. Without this, the test can't find the worker PIDs.
Expand All @@ -48,19 +58,51 @@ def _test_gunicorn(gunicorn, tmp_path, monkeypatch, *args):
filename = str(tmp_path / "gunicorn.pprof")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)

proc = gunicorn("-w", "3", *args)
# DEV: We only start 1 worker to simplify the test
proc = gunicorn("-w", "1", *args)
# Wait for the workers to start
time.sleep(3)
proc.terminate()

try:
with urllib.request.urlopen("http://127.0.0.1:7643") as f:
status_code = f.getcode()
assert status_code == 200, status_code
response = f.read().decode()
debug_print(response)

except Exception as e:
pytest.fail("Failed to make request to gunicorn server %s" % e)
finally:
# Need to terminate the process to get the output and release the port
proc.terminate()

output = proc.stdout.read().decode()
worker_pids = _get_worker_pids(output)

assert len(worker_pids) == 3, output
for line in output.splitlines():
debug_print(line)

assert len(worker_pids) == 1, output
assert proc.wait() == 0, output
assert "module 'threading' has no attribute '_active'" not in output, output

profile = pprof_utils.parse_profile("%s.%d" % (filename, proc.pid))
samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time")
assert len(samples) > 0
for pid in worker_pids:
debug_print("Reading pprof file with prefix %s.%d" % (filename, pid))
profile = pprof_utils.parse_profile("%s.%d" % (filename, pid))
# This returns a list of samples that have non-zero cpu-time
samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time")
assert len(samples) > 0

pprof_utils.assert_profile_has_sample(
profile,
samples=samples,
expected_sample=pprof_utils.StackEvent(
locations=[
pprof_utils.StackLocation(function_name="fib", filename="gunicorn-app.py", line_no=8),
pprof_utils.StackLocation(function_name="fib", filename="gunicorn-app.py", line_no=8),
]
),
)


def test_gunicorn(gunicorn, tmp_path, monkeypatch):
Expand Down

0 comments on commit c3899f7

Please sign in to comment.