Skip to content

Commit 77ab2c8

Browse files
committed
chore: limit symdb uploaders under spawn
We use file-based IPC to ensure that Symbol DB has as most 2 active uploader processes under more general circumstances than fork, such as spawn.
1 parent aeb5df4 commit 77ab2c8

File tree

3 files changed

+113
-27
lines changed

3 files changed

+113
-27
lines changed

ddtrace/internal/ipc.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ class BaseLock:
1919
def __init__(self, file: typing.IO[typing.Any]):
2020
self.file = file
2121

22-
def acquire(self):
23-
...
22+
def acquire(self): ...
2423

25-
def release(self):
26-
...
24+
def release(self): ...
2725

2826
def __enter__(self):
2927
self.acquire()
@@ -99,32 +97,38 @@ def open_file(path, mode):
9997
class SharedStringFile:
10098
"""A simple shared-file implementation for multiprocess communication."""
10199

102-
def __init__(self) -> None:
103-
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8))
100+
def __init__(self, name: typing.Optional[str] = None) -> None:
101+
self.filename: typing.Optional[str] = str(TMPDIR / (name or secrets.token_hex(8)))
102+
103+
def put_unlocked(self, f: typing.BinaryIO, data: str) -> None:
104+
f.seek(0, os.SEEK_END)
105+
dt = (data + "\x00").encode()
106+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
107+
f.write(dt)
104108

105109
def put(self, data: str) -> None:
106110
"""Put a string into the file."""
107111
if self.filename is None:
108112
return
109113

110114
try:
111-
with open_file(self.filename, "ab") as f, WriteLock(f):
112-
f.seek(0, os.SEEK_END)
113-
dt = (data + "\x00").encode()
114-
if f.tell() + len(dt) <= MAX_FILE_SIZE:
115-
f.write(dt)
115+
with self.lock_exclusive() as f:
116+
self.put_unlocked(f, data)
116117
except Exception: # nosec
117118
pass
118119

120+
def peekall_unlocked(self, f: typing.BinaryIO) -> typing.List[str]:
121+
f.seek(0)
122+
return f.read().strip(b"\x00").decode().split("\x00")
123+
119124
def peekall(self) -> typing.List[str]:
120125
"""Peek at all strings from the file."""
121126
if self.filename is None:
122127
return []
123128

124129
try:
125-
with open_file(self.filename, "r+b") as f, ReadLock(f):
126-
f.seek(0)
127-
return f.read().strip(b"\x00").decode().split("\x00")
130+
with self.lock_shared() as f:
131+
return self.peekall_unlocked(f)
128132
except Exception: # nosec
129133
return []
130134

@@ -134,7 +138,7 @@ def snatchall(self) -> typing.List[str]:
134138
return []
135139

136140
try:
137-
with open_file(self.filename, "r+b") as f, WriteLock(f):
141+
with self.lock_exclusive() as f:
138142
f.seek(0)
139143
strings = f.read().strip(b"\x00").decode().split("\x00")
140144

@@ -144,3 +148,27 @@ def snatchall(self) -> typing.List[str]:
144148
return strings
145149
except Exception: # nosec
146150
return []
151+
152+
def clear(self) -> None:
153+
"""Clear all strings from the file."""
154+
if self.filename is None:
155+
return
156+
157+
try:
158+
with self.lock_exclusive() as f:
159+
f.seek(0)
160+
f.truncate()
161+
except Exception: # nosec
162+
pass
163+
164+
@contextmanager
165+
def lock_shared(self):
166+
"""Context manager to acquire a shared/read lock on the file."""
167+
with open_file(self.filename, "r+b") as f, ReadLock(f):
168+
yield f
169+
170+
@contextmanager
171+
def lock_exclusive(self):
172+
"""Context manager to acquire an exclusive/write lock on the file."""
173+
with open_file(self.filename, "r+b") as f, WriteLock(f):
174+
yield f

ddtrace/internal/symbol_db/remoteconfig.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import os
2+
import sys
23
import typing as t
34

45
from ddtrace.internal.forksafe import has_forked
6+
from ddtrace.internal.ipc import SharedStringFile
57
from ddtrace.internal.logger import get_logger
68
from ddtrace.internal.products import manager as product_manager
79
from ddtrace.internal.remoteconfig import Payload
@@ -18,20 +20,34 @@
1820

1921
log = get_logger(__name__)
2022

23+
# Use a shared file to keep track of which PIDs have Symbol DB enabled. This way
24+
# we can ensure that at most two processes are emitting symbols under a large
25+
# range of scenarios.
26+
shared_pid_file = SharedStringFile(f"{str(hash(tuple(sys.argv)))[:8]}-symdb-pids")
27+
28+
MAX_SYMDB_UPLOADERS = 2 # parent + 1 child
29+
2130

2231
def _rc_callback(data: t.Sequence[Payload]):
23-
if get_ancestor_runtime_id() is not None and has_forked():
24-
log.debug("[PID %d] SymDB: Disabling Symbol DB in forked process", os.getpid())
25-
# We assume that forking is being used for spawning child worker
26-
# processes. Therefore, we avoid uploading the same symbols from each
27-
# child process. We restrict the enablement of Symbol DB to just the
28-
# parent process and the first fork child.
29-
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
30-
31-
if SymbolDatabaseUploader.is_installed():
32-
SymbolDatabaseUploader.uninstall()
33-
34-
return
32+
with shared_pid_file.lock_exclusive() as f:
33+
if (get_ancestor_runtime_id() is not None and has_forked()) or len(
34+
set(shared_pid_file.peekall_unlocked(f))
35+
) >= MAX_SYMDB_UPLOADERS:
36+
log.debug("[PID %d] SymDB: Disabling Symbol DB in child process", os.getpid())
37+
# We assume that forking is being used for spawning child worker
38+
# processes. Therefore, we avoid uploading the same symbols from each
39+
# child process. We restrict the enablement of Symbol DB to just the
40+
# parent process and the first fork child.
41+
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
42+
43+
if SymbolDatabaseUploader.is_installed():
44+
SymbolDatabaseUploader.uninstall()
45+
46+
return
47+
48+
# Store the PID of the current process so that we know which processes
49+
# have Symbol DB enabled.
50+
shared_pid_file.put_unlocked(f, str(os.getpid()))
3551

3652
for payload in data:
3753
if payload.metadata is None:

tests/internal/symbol_db/test_symbols.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
from ddtrace.internal.symbol_db.symbols import SymbolType
1616

1717

18+
@pytest.fixture(autouse=True, scope="function")
19+
def pid_file_teardown():
20+
from ddtrace.internal.symbol_db.remoteconfig import shared_pid_file
21+
22+
yield
23+
24+
shared_pid_file.clear()
25+
26+
1827
def test_symbol_from_code():
1928
def foo(a, b, c=None):
2029
loc = 42
@@ -320,3 +329,36 @@ def test_symbols_fork_uploads():
320329

321330
for pid in pids:
322331
os.waitpid(pid, 0)
332+
333+
334+
def spawn_target(results):
335+
from ddtrace.internal.remoteconfig import ConfigMetadata
336+
from ddtrace.internal.remoteconfig import Payload
337+
from ddtrace.internal.symbol_db.remoteconfig import _rc_callback
338+
from ddtrace.internal.symbol_db.symbols import SymbolDatabaseUploader
339+
340+
SymbolDatabaseUploader.install()
341+
342+
rc_data = [Payload(ConfigMetadata("test", "symdb", "hash", 0, 0), "test", None)]
343+
_rc_callback(rc_data)
344+
results.append(SymbolDatabaseUploader.is_installed())
345+
346+
347+
def test_symbols_spawn_uploads():
348+
import multiprocessing
349+
350+
multiprocessing.set_start_method("spawn", force=True)
351+
mc_context = multiprocessing.get_context("spawn")
352+
manager = multiprocessing.Manager()
353+
returns = manager.list()
354+
jobs = []
355+
356+
for _ in range(10):
357+
p = mc_context.Process(target=spawn_target, args=(returns,))
358+
p.start()
359+
jobs.append(p)
360+
361+
for p in jobs:
362+
p.join()
363+
364+
assert sum(returns) == 2

0 commit comments

Comments
 (0)