Skip to content

Commit ac57ec7

Browse files
dd-octo-sts[bot]juanjuxtaegyunkim
authored
fix(internal): fix for potential race condition [backport 3.17] (#15049)
Backport ae5562f from #15018 to 3.17. ## Description We had a potential (and reported to happen to a client) race condition where the `self.endpoints` list in `internal/endpoints.py` could change size during iteration (modified by a thread), causing an exception. This fixes it by using a tuple copy of the iterated list and adds regression tests. Thanks Adria Ardilla for reporting and suggesting the fix. Signed-off-by: Juanjo Alvarez <juanjo.alvarezmartinez@datadoghq.com> Co-authored-by: Juanjo Alvarez Martinez <juanjo.alvarezmartinez@datadoghq.com> Co-authored-by: Taegyun Kim <taegyun.kim@datadoghq.com>
1 parent 36f58bc commit ac57ec7

File tree

3 files changed

+186
-2
lines changed

3 files changed

+186
-2
lines changed

ddtrace/internal/endpoints.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,16 @@ def flush(self, max_length: int) -> dict:
106106
"""
107107
Flush the endpoints to a payload, returning the first `max` endpoints.
108108
"""
109+
endpoints_snapshot = tuple(self.endpoints)
109110
if max_length >= len(self.endpoints):
110111
res = {
111112
"is_first": self.is_first,
112-
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in self.endpoints],
113+
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in endpoints_snapshot],
113114
}
114115
self.reset()
115116
return res
116117
else:
117-
batch = [self.endpoints.pop() for _ in range(max_length)]
118+
batch = tuple(self.endpoints.pop() for _ in range(max_length))
118119
res = {
119120
"is_first": self.is_first,
120121
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in batch],
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
Fix a potential race condition in the tracer.

tests/internal/test_endpoints.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import threading
2+
from time import sleep
3+
4+
import pytest
5+
6+
from ddtrace.internal.endpoints import HttpEndPointsCollection
7+
8+
9+
@pytest.fixture
10+
def collection():
11+
coll = HttpEndPointsCollection()
12+
coll.reset()
13+
yield coll
14+
coll.reset()
15+
16+
17+
def test_flush_uses_tuple_snapshot(collection):
18+
"""Test that flush() operates on a tuple snapshot, not the original set."""
19+
collection.add_endpoint("GET", "/api/users")
20+
collection.add_endpoint("POST", "/api/users")
21+
collection.add_endpoint("DELETE", "/api/users/123")
22+
23+
assert len(collection.endpoints) == 3
24+
result = collection.flush(max_length=10)
25+
assert result["is_first"] is True
26+
assert len(result["endpoints"]) == 3
27+
assert len(collection.endpoints) == 0
28+
29+
30+
def test_flush_snapshot_prevents_modification_during_iteration(collection):
31+
"""Test that modifying self.endpoints during flush iteration doesn't cause RuntimeError."""
32+
collection.add_endpoint("GET", "/api/v1")
33+
collection.add_endpoint("POST", "/api/v2")
34+
collection.add_endpoint("PUT", "/api/v3")
35+
36+
initial_count = len(collection.endpoints)
37+
assert initial_count == 3
38+
result = collection.flush(max_length=10)
39+
40+
assert len(result["endpoints"]) == initial_count
41+
assert result["is_first"] is True
42+
43+
44+
def test_concurrent_add_during_flush_does_not_break_iteration(collection):
45+
"""Test that adding endpoints from another thread during flush doesn't cause RuntimeError."""
46+
for i in range(5):
47+
collection.add_endpoint("GET", f"/api/endpoint{i}")
48+
49+
assert len(collection.endpoints) == 5
50+
51+
flush_completed = threading.Event()
52+
flush_result = {}
53+
exception_caught = []
54+
55+
def flush_thread():
56+
try:
57+
result = collection.flush(max_length=10)
58+
flush_result["data"] = result
59+
flush_completed.set()
60+
except Exception as e:
61+
exception_caught.append(e)
62+
flush_completed.set()
63+
64+
def add_thread():
65+
sleep(0.001)
66+
67+
# Try to modify the set while flush might be iterating
68+
for i in range(5, 10):
69+
collection.add_endpoint("POST", f"/api/new{i}")
70+
sleep(0.001)
71+
72+
t1 = threading.Thread(target=flush_thread)
73+
t2 = threading.Thread(target=add_thread)
74+
75+
t1.start()
76+
t2.start()
77+
78+
t1.join(timeout=2.0)
79+
t2.join(timeout=2.0)
80+
81+
assert flush_completed.is_set(), "Flush did not complete"
82+
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
83+
assert "data" in flush_result, "Flush did not return a result"
84+
85+
result = flush_result["data"]
86+
assert "endpoints" in result
87+
assert "is_first" in result
88+
89+
90+
def test_flush_with_partial_batch(collection):
91+
"""Test that flush creates a tuple snapshot even when using pop() for partial batches."""
92+
for i in range(10):
93+
collection.add_endpoint("GET", f"/api/endpoint{i}")
94+
95+
assert len(collection.endpoints) == 10
96+
97+
result = collection.flush(max_length=5)
98+
99+
assert len(result["endpoints"]) == 5
100+
assert result["is_first"] is True
101+
assert len(collection.endpoints) == 5
102+
103+
result2 = collection.flush(max_length=10)
104+
assert len(result2["endpoints"]) == 5
105+
assert result2["is_first"] is False # Not first anymore
106+
107+
assert len(collection.endpoints) == 0
108+
109+
110+
def test_partial_flush_with_concurrent_modification(collection):
111+
"""Test that partial flush (max_length < size) is safe from race conditions."""
112+
for i in range(10):
113+
collection.add_endpoint("GET", f"/api/endpoint{i}")
114+
115+
assert len(collection.endpoints) == 10
116+
117+
flush_completed = threading.Event()
118+
flush_result = {}
119+
exception_caught = []
120+
121+
def flush_thread():
122+
try:
123+
# Partial flush - this should trigger the else branch at line 118
124+
result = collection.flush(max_length=5)
125+
flush_result["data"] = result
126+
flush_completed.set()
127+
except Exception as e:
128+
exception_caught.append(e)
129+
flush_completed.set()
130+
131+
def add_thread():
132+
sleep(0.001)
133+
# Try to modify the set while flush might be iterating
134+
for i in range(10, 15):
135+
collection.add_endpoint("POST", f"/api/new{i}")
136+
sleep(0.001)
137+
138+
t1 = threading.Thread(target=flush_thread)
139+
t2 = threading.Thread(target=add_thread)
140+
141+
t1.start()
142+
t2.start()
143+
144+
t1.join(timeout=2.0)
145+
t2.join(timeout=2.0)
146+
147+
assert flush_completed.is_set(), "Flush did not complete"
148+
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
149+
assert "data" in flush_result, "Flush did not return a result"
150+
151+
result = flush_result["data"]
152+
assert len(result["endpoints"]) == 5
153+
assert "is_first" in result
154+
155+
156+
def test_http_endpoint_hash_consistency(collection):
157+
"""Test that HttpEndPoint hashing works correctly for set operations."""
158+
collection.add_endpoint("GET", "/api/test")
159+
collection.add_endpoint("GET", "/api/test")
160+
assert len(collection.endpoints) == 1
161+
162+
collection.add_endpoint("POST", "/api/test")
163+
collection.add_endpoint("GET", "/api/other")
164+
assert len(collection.endpoints) == 3
165+
166+
167+
def test_snapshot_is_tuple_type(collection):
168+
"""Verify that the snapshot created in flush is actually a tuple."""
169+
collection.add_endpoint("GET", "/test")
170+
collection.add_endpoint("POST", "/test")
171+
assert isinstance(collection.endpoints, set)
172+
173+
result = collection.flush(max_length=10)
174+
assert len(result["endpoints"]) == 2
175+
176+
for ep in result["endpoints"]:
177+
assert isinstance(ep, dict)
178+
assert "method" in ep
179+
assert "path" in ep

0 commit comments

Comments
 (0)