Skip to content

Commit 20b0ef4

Browse files
fix: reinitialize locks after fork to prevent deadlocks in child processes
Summary This commit adds post-fork reinitialization of threading locks across multiple components in the OpenTelemetry Python SDK and API. It ensures that threading.Lock() instances are safely reinitialized in child processes after a fork(), preventing potential deadlocks and undefined behavior. Details Introduced usage of register_at_fork(after_in_child=...) from the os module to reinitialize thread locks. Used weakref.WeakMethod() to safely refer to bound instance methods in register_at_fork. Added _at_fork_reinit() methods to classes using threading locks and registered them to run in child processes post-fork. Applied this to all usages of Lock, RLock Rationale Forked child processes inherit thread state from the parent, including the internal state of locks. This can cause deadlocks or runtime errors if a lock was held at the time of the fork. By reinitializing locks using the register_at_fork mechanism, we ensure child processes start with clean lock states. This is especially relevant for WSGI servers and environments that use pre-fork models (e.g., gunicorn, uWSGI), where instrumentation and telemetry components may misbehave without this precaution.
1 parent 36ac612 commit 20b0ef4

File tree

17 files changed

+165
-6
lines changed

17 files changed

+165
-6
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
"""OTLP Exporter"""
1616

1717
import threading
18+
import weakref
1819
from abc import ABC, abstractmethod
1920
from collections.abc import Sequence # noqa: F401
2021
from logging import getLogger
21-
from os import environ
22+
from os import environ, register_at_fork
2223
from time import sleep
2324
from typing import ( # noqa: F401
2425
Any,
@@ -261,8 +262,13 @@ def __init__(
261262
self._client = self._stub(self._channel)
262263

263264
self._export_lock = threading.Lock()
265+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
266+
register_at_fork(after_in_child=lambda: weak_reinit()())
264267
self._shutdown = False
265268

269+
def _at_fork_reinit(self):
270+
self._export_lock._at_fork_reinit()
271+
266272
@abstractmethod
267273
def _translate_data(
268274
self, data: TypingSequence[SDKDataT]

opentelemetry-api/src/opentelemetry/attributes/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
import logging
1616
import threading
17+
import weakref
1718
from collections import OrderedDict
1819
from collections.abc import MutableMapping
1920
from typing import Mapping, Optional, Sequence, Tuple, Union
21+
from os import register_at_fork
2022

2123
from opentelemetry.util import types
2224

@@ -263,6 +265,11 @@ def __init__(
263265
for key, value in attributes.items():
264266
self[key] = value
265267
self._immutable = immutable
268+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
269+
register_at_fork(after_in_child=lambda: weak_reinit()())
270+
271+
def _at_fork_reinit(self):
272+
self._lock._at_fork_reinit()
266273

267274
def __repr__(self) -> str:
268275
return f"{dict(self._dict)}"

opentelemetry-api/src/opentelemetry/metrics/_internal/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@
4141
"""
4242

4343
import warnings
44+
import weakref
4445
from abc import ABC, abstractmethod
4546
from dataclasses import dataclass
4647
from logging import getLogger
47-
from os import environ
48+
from os import environ, register_at_fork
4849
from threading import Lock
4950
from typing import Dict, List, Optional, Sequence, Union, cast
5051

@@ -156,6 +157,11 @@ def __init__(self) -> None:
156157
self._lock = Lock()
157158
self._meters: List[_ProxyMeter] = []
158159
self._real_meter_provider: Optional[MeterProvider] = None
160+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
161+
register_at_fork(after_in_child=lambda: weak_reinit()())
162+
163+
def _at_fork_reinit(self):
164+
self._lock._at_fork_reinit()
159165

160166
def get_meter(
161167
self,
@@ -510,6 +516,11 @@ def __init__(
510516
self._lock = Lock()
511517
self._instruments: List[_ProxyInstrumentT] = []
512518
self._real_meter: Optional[Meter] = None
519+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
520+
register_at_fork(after_in_child=lambda: weak_reinit()())
521+
522+
def _at_fork_reinit(self):
523+
self._lock._at_fork_reinit()
513524

514525
def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
515526
"""Called when a real meter provider is set on the creating _ProxyMeterProvider

opentelemetry-api/src/opentelemetry/util/_once.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import weakref
16+
from os import register_at_fork
1517
from threading import Lock
1618
from typing import Callable
1719

@@ -25,6 +27,11 @@ class Once:
2527
def __init__(self) -> None:
2628
self._lock = Lock()
2729
self._done = False
30+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
31+
register_at_fork(after_in_child=lambda: weak_reinit()())
32+
33+
def _at_fork_reinit(self):
34+
self._lock._at_fork_reinit()
2835

2936
def do_once(self, func: Callable[[], None]) -> bool:
3037
"""Execute ``func`` if it hasn't been executed or return.

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import threading
2222
import traceback
2323
import warnings
24-
from os import environ
24+
import weakref
25+
from os import environ, register_at_fork
2526
from threading import Lock
2627
from time import time_ns
2728
from typing import Any, Callable, Tuple, Union, cast # noqa
@@ -317,6 +318,11 @@ def __init__(self):
317318
# iterating through it on "emit".
318319
self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...]
319320
self._lock = threading.Lock()
321+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
322+
register_at_fork(after_in_child=lambda: weak_reinit()())
323+
324+
def _at_fork_reinit(self):
325+
self._lock._at_fork_reinit()
320326

321327
def add_log_record_processor(
322328
self, log_record_processor: LogRecordProcessor
@@ -379,6 +385,11 @@ def __init__(self, max_workers: int = 2):
379385
self._executor = concurrent.futures.ThreadPoolExecutor(
380386
max_workers=max_workers
381387
)
388+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
389+
register_at_fork(after_in_child=lambda: weak_reinit()())
390+
391+
def _at_fork_reinit(self):
392+
self._lock._at_fork_reinit()
382393

383394
def add_log_record_processor(
384395
self, log_record_processor: LogRecordProcessor
@@ -633,6 +644,11 @@ def __init__(
633644
self._at_exit_handler = atexit.register(self.shutdown)
634645
self._logger_cache = {}
635646
self._logger_cache_lock = Lock()
647+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
648+
register_at_fork(after_in_child=lambda: weak_reinit()())
649+
650+
def _at_fork_reinit(self):
651+
self._logger_cache_lock._at_fork_reinit()
636652

637653
@property
638654
def resource(self):

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import threading
1616
import typing
17+
import weakref
18+
from os import register_at_fork
1719

1820
from opentelemetry.sdk._logs import LogData
1921
from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
@@ -30,8 +32,13 @@ class InMemoryLogExporter(LogExporter):
3032
def __init__(self):
3133
self._logs = []
3234
self._lock = threading.Lock()
35+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
36+
register_at_fork(after_in_child=lambda: weak_reinit()())
3337
self._stopped = False
3438

39+
def _at_fork_reinit(self):
40+
self._lock._at_fork_reinit()
41+
3542
def clear(self) -> None:
3643
with self._lock:
3744
self._logs.clear()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import weakref
1616
from atexit import register, unregister
1717
from logging import getLogger
18-
from os import environ
18+
from os import environ, register_at_fork
1919
from threading import Lock
2020
from time import time_ns
2121
from typing import Optional, Sequence
@@ -88,6 +88,11 @@ def __init__(
8888
self._measurement_consumer = measurement_consumer
8989
self._instrument_id_instrument = {}
9090
self._instrument_id_instrument_lock = Lock()
91+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
92+
register_at_fork(after_in_child=lambda: weak_reinit()())
93+
94+
def _at_fork_reinit(self):
95+
self._instrument_id_instrument_lock._at_fork_reinit()
9196

9297
def create_counter(self, name, unit="", description="") -> APICounter:
9398
status = self._register_instrument(name, _Counter, unit, description)
@@ -421,6 +426,8 @@ def __init__(
421426
):
422427
self._lock = Lock()
423428
self._meter_lock = Lock()
429+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
430+
register_at_fork(after_in_child=lambda: weak_reinit()())
424431
self._atexit_handler = None
425432
if resource is None:
426433
resource = Resource.create({})
@@ -463,6 +470,14 @@ def __init__(
463470
self._measurement_consumer.collect
464471
)
465472

473+
def _at_fork_reinit(self):
474+
self._lock._at_fork_reinit()
475+
self._meter_lock._at_fork_reinit()
476+
477+
@classmethod
478+
def _register_fork_handlers(cls):
479+
register_at_fork(after_in_child=cls._all_metric_readers_lock._at_fork_reinit)
480+
466481
def force_flush(self, timeout_millis: float = 10_000) -> bool:
467482
deadline_ns = time_ns() + timeout_millis * 10**6
468483

@@ -580,3 +595,6 @@ def get_meter(
580595
self._measurement_consumer,
581596
)
582597
return self._meters[info]
598+
599+
# Call the method after the class is fully defined
600+
MeterProvider._register_fork_handlers()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
# limitations under the License.
1414

1515

16+
import weakref
1617
from logging import getLogger
18+
from os import register_at_fork
1719
from threading import Lock
1820
from time import time_ns
1921
from typing import Dict, List, Optional, Sequence
@@ -44,6 +46,8 @@ def __init__(
4446
self._instrument = instrument
4547
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
4648
self._lock = Lock()
49+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
50+
register_at_fork(after_in_child=lambda: weak_reinit()())
4751
self._instrument_class_aggregation = instrument_class_aggregation
4852
self._name = self._view._name or self._instrument.name
4953
self._description = (
@@ -66,6 +70,9 @@ def __init__(
6670
0,
6771
)
6872

73+
def _at_fork_reinit(self):
74+
self._lock._at_fork_reinit()
75+
6976
def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
7077
# pylint: disable=protected-access
7178

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
# pylint: disable=too-many-lines
1616

17+
import weakref
1718
from abc import ABC, abstractmethod
1819
from bisect import bisect_left
1920
from enum import IntEnum
2021
from functools import partial
2122
from logging import getLogger
2223
from math import inf
24+
from os import register_at_fork
2325
from threading import Lock
2426
from typing import (
2527
Callable,
@@ -98,9 +100,14 @@ def __init__(
98100
):
99101
self._lock = Lock()
100102
self._attributes = attributes
103+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
104+
register_at_fork(after_in_child=lambda: weak_reinit()())
101105
self._reservoir = reservoir_builder()
102106
self._previous_point = None
103107

108+
def _at_fork_reinit(self):
109+
self._lock._at_fork_reinit()
110+
104111
@abstractmethod
105112
def aggregate(
106113
self, measurement: Measurement, should_sample_exemplar: bool = True

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from math import ldexp
16+
from os import register_at_fork
1617
from threading import Lock
1718

1819
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
@@ -42,6 +43,11 @@ class ExponentMapping(Mapping):
4243
_min_scale = -10
4344
_max_scale = 0
4445

46+
# Add a class method for initialization that includes fork handler registration
47+
@classmethod
48+
def _register_fork_handlers(cls):
49+
register_at_fork(after_in_child=cls._mappings_lock._at_fork_reinit)
50+
4551
def _get_min_scale(self):
4652
# _min_scale defines the point at which the exponential mapping
4753
# function becomes useless for 64-bit floats. With scale -10, ignoring
@@ -139,3 +145,6 @@ def get_lower_boundary(self, index: int) -> float:
139145
@property
140146
def scale(self) -> int:
141147
return self._scale
148+
149+
# Call the method after the class is fully defined
150+
ExponentMapping._register_fork_handlers()

0 commit comments

Comments
 (0)