Skip to content

Commit 1008351

Browse files
authored
Dynamically reconfigure sdkstats on OneSettings configuration change (#42877)
1 parent b928db1 commit 1008351

File tree

23 files changed

+2655
-1026
lines changed

23 files changed

+2655
-1026
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)
3030
- Refactoring of statsbeat to use `StatsbeatManager`
3131
([#42716] https://github.com/Azure/azure-sdk-for-python/pull/42716)
32+
- Support sdk stats dynamic change on OneSettings config change
33+
([#42877] https://github.com/Azure/azure-sdk-for-python/pull/42877)
3234

3335
## 1.0.0b41 (2025-07-31)
3436

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
from azure.monitor.opentelemetry.exporter._constants import (
99
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
10-
_ONE_SETTINGS_PYTHON_KEY,
1110
_ONE_SETTINGS_CHANGE_URL,
1211
_ONE_SETTINGS_CONFIG_URL,
1312
)
1413
from azure.monitor.opentelemetry.exporter._configuration._utils import make_onesettings_request
14+
from azure.monitor.opentelemetry.exporter._utils import Singleton
1515

1616
# Set up logger
1717
logger = logging.getLogger(__name__)
@@ -35,34 +35,42 @@ def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4
3535
)
3636

3737

38-
class _ConfigurationManager:
38+
class _ConfigurationManager(metaclass=Singleton):
3939
"""Singleton class to manage configuration settings."""
4040

41-
_instance = None
42-
_configuration_worker = None
43-
_instance_lock = Lock()
44-
_state_lock = Lock() # Single lock for all state
45-
_current_state = _ConfigurationState()
41+
def __init__(self):
42+
"""Initialize the ConfigurationManager instance."""
4643

47-
def __new__(cls):
48-
with cls._instance_lock:
49-
if cls._instance is None:
50-
cls._instance = super(_ConfigurationManager, cls).__new__(cls)
51-
# Initialize the instance here to avoid re-initialization
52-
cls._instance._initialize_worker()
53-
return cls._instance
44+
self._configuration_worker = None
45+
self._state_lock = Lock() # Single lock for all state
46+
self._current_state = _ConfigurationState()
47+
self._callbacks = []
48+
self._initialize_worker()
5449

5550
def _initialize_worker(self):
5651
"""Initialize the ConfigurationManager and start the configuration worker."""
5752
# Lazy import to avoid circular import
5853
from azure.monitor.opentelemetry.exporter._configuration._worker import _ConfigurationWorker
5954

6055
# Get initial refresh interval from state
61-
with _ConfigurationManager._state_lock:
62-
initial_refresh_interval = _ConfigurationManager._current_state.refresh_interval
56+
with self._state_lock:
57+
initial_refresh_interval = self._current_state.refresh_interval
6358

64-
self._configuration_worker = _ConfigurationWorker(initial_refresh_interval)
59+
self._configuration_worker = _ConfigurationWorker(self, initial_refresh_interval)
6560

61+
def register_callback(self, callback):
62+
# Register a callback to be invoked when configuration changes.
63+
self._callbacks.append(callback)
64+
65+
def _notify_callbacks(self, settings: Dict[str, str]):
66+
# Notify all registered callbacks of configuration changes.
67+
for cb in self._callbacks:
68+
try:
69+
cb(settings)
70+
except Exception as ex: # pylint: disable=broad-except
71+
logger.warning("Callback failed: %s", ex)
72+
73+
# pylint: disable=too-many-statements
6674
def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
6775
"""Fetch configuration from OneSettings and update local cache atomically.
6876
@@ -122,8 +130,8 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
122130
headers = {}
123131

124132
# Read current state atomically
125-
with _ConfigurationManager._state_lock:
126-
current_state = _ConfigurationManager._current_state
133+
with self._state_lock:
134+
current_state = self._current_state
127135
if current_state.etag:
128136
headers["If-None-Match"] = current_state.etag
129137
if current_state.refresh_interval:
@@ -145,8 +153,8 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
145153
# Handle version and settings updates
146154
elif response.settings and response.version is not None:
147155
needs_config_fetch = False
148-
with _ConfigurationManager._state_lock:
149-
current_state = _ConfigurationManager._current_state
156+
with self._state_lock:
157+
current_state = self._current_state
150158

151159
if response.version > current_state.version_cache:
152160
# Version increase: new config available
@@ -182,34 +190,41 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
182190
# No settings or version provided
183191
logger.warning("No settings or version provided in config response. Config not updated.")
184192

193+
194+
notify_callbacks = False
195+
current_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
196+
state_for_callbacks = None
197+
185198
# Atomic state update
186-
with _ConfigurationManager._state_lock:
187-
latest_state = _ConfigurationManager._current_state # Always use latest state
188-
_ConfigurationManager._current_state = latest_state.with_updates(**new_state_updates)
189-
return _ConfigurationManager._current_state.refresh_interval
199+
with self._state_lock:
200+
latest_state = self._current_state # Always use latest state
201+
self._current_state = latest_state.with_updates(**new_state_updates)
202+
current_refresh_interval = self._current_state.refresh_interval
203+
if 'settings_cache' in new_state_updates:
204+
notify_callbacks = True
205+
state_for_callbacks = self._current_state
206+
207+
# Handle configuration updates throughout the SDK
208+
if notify_callbacks and state_for_callbacks is not None and state_for_callbacks.settings_cache:
209+
self._notify_callbacks(state_for_callbacks.settings_cache)
210+
211+
return current_refresh_interval
190212

191213
def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742
192214
"""Get current settings cache."""
193-
with _ConfigurationManager._state_lock:
194-
return _ConfigurationManager._current_state.settings_cache.copy()
215+
with self._state_lock:
216+
return self._current_state.settings_cache.copy() # type: ignore
195217

196218
def get_current_version(self) -> int: # pylint: disable=C4741,C4742
197219
"""Get current version."""
198-
with _ConfigurationManager._state_lock:
199-
return _ConfigurationManager._current_state.version_cache
220+
with self._state_lock:
221+
return self._current_state.version_cache # type: ignore
200222

201223
def shutdown(self) -> None:
202224
"""Shutdown the configuration worker."""
203-
with _ConfigurationManager._instance_lock:
204-
if self._configuration_worker:
205-
self._configuration_worker.shutdown()
206-
self._configuration_worker = None
207-
if _ConfigurationManager._instance:
208-
_ConfigurationManager._instance = None
209-
210-
211-
def _update_configuration_and_get_refresh_interval() -> int:
212-
targeting = {
213-
"namespaces": _ONE_SETTINGS_PYTHON_KEY,
214-
}
215-
return _ConfigurationManager().get_configuration_and_refresh_interval(targeting)
225+
if self._configuration_worker:
226+
self._configuration_worker.shutdown()
227+
self._configuration_worker = None
228+
# Clear the singleton instance from the metaclass
229+
if self.__class__ in Singleton._instances: # pylint: disable=protected-access
230+
del Singleton._instances[self.__class__] # pylint: disable=protected-access

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Dict, Optional
44
import json
55
import logging
6+
# mypy: disable-error-code="import-untyped"
67
import requests
78

89
from azure.monitor.opentelemetry.exporter._constants import (

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import threading
66
import random
7-
from azure.monitor.opentelemetry.exporter._configuration import _update_configuration_and_get_refresh_interval
7+
from azure.monitor.opentelemetry.exporter._constants import _ONE_SETTINGS_PYTHON_TARGETING
88

99
logger = logging.getLogger(__name__)
1010

@@ -28,14 +28,15 @@ class _ConfigurationWorker:
2828
_running (bool): Flag indicating if the worker is currently running
2929
"""
3030

31-
def __init__(self, refresh_interval=None) -> None:
31+
def __init__(self, configuration_manager, refresh_interval=None) -> None:
3232
"""Initialize and start the configuration worker thread.
3333
3434
Creates and starts a background daemon thread that will periodically refresh
3535
configuration from OneSettings. The thread starts immediately upon initialization
3636
with a random startup delay to prevent thundering herd issues.
3737
3838
Args:
39+
configuration_manager: The ConfigurationManager instance to update
3940
refresh_interval (Optional[int]): Initial refresh interval in seconds.
4041
If None, defaults to 3600 seconds (1 hour).
4142
@@ -44,6 +45,7 @@ def __init__(self, refresh_interval=None) -> None:
4445
0-15 second startup delay to stagger configuration requests across multiple
4546
SDK instances during startup or recovery from outages.
4647
"""
48+
self._configuration_manager = configuration_manager
4749
self._default_refresh_interval = 3600 # Default to 60 minutes in seconds
4850
self._interval_lock = threading.Lock()
4951
self._state_lock = threading.Lock()
@@ -132,9 +134,9 @@ def _get_configuration(self) -> None:
132134

133135
while not self._shutdown_event.is_set():
134136
try:
135-
# Perform the refresh operation
136137
with self._interval_lock:
137-
self._refresh_interval = _update_configuration_and_get_refresh_interval()
138+
self._refresh_interval = \
139+
self._configuration_manager.get_configuration_and_refresh_interval(_ONE_SETTINGS_PYTHON_TARGETING)
138140
except Exception as ex: # pylint: disable=broad-exception-caught
139141
logger.warning("Configuration refresh failed: %s", ex)
140142

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_connection_string_parser.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@
1414
# Specs taken from https://tools.ietf.org/html/rfc4122
1515
uuid_regex_pattern = re.compile("^[0-9a-f]{8}-" "[0-9a-f]{4}-" + "[0-9a-f]{4}-" "[0-9a-f]{4}-" "[0-9a-f]{12}$")
1616

17+
# Pattern to extract region from ingestion endpoint URL
18+
# Examples:
19+
# - https://westeurope-5.in.applicationinsights.azure.com/ -> westeurope
20+
# - https://westeurope.in.applicationinsights.azure.com/ -> westeurope
21+
# - https://eastus-1.in.applicationinsights.azure.com/ -> eastus
22+
# - https://dc.services.visualstudio.com -> None (global endpoint)
23+
region_from_endpoint_pattern = re.compile(r"https://([a-z0-9]+)(?:-\d+)?\.in\.applicationinsights\.azure\.com")
24+
1725

1826
class ConnectionStringParser:
1927
"""ConnectionString parser.
@@ -29,6 +37,7 @@ def __init__(self, connection_string: typing.Optional[str] = None) -> None:
2937
self.live_endpoint = ""
3038
self._connection_string = connection_string
3139
self.aad_audience = ""
40+
self.region = ""
3241
self._initialize()
3342
self._validate_instrumentation_key()
3443

@@ -66,6 +75,24 @@ def _initialize(self) -> None:
6675
code_cs.get(AAD_AUDIENCE) or env_cs.get(AAD_AUDIENCE) # type: ignore
6776
)
6877

78+
# Extract region information
79+
self.region = self._extract_region() # type: ignore
80+
81+
def _extract_region(self) -> typing.Optional[str]:
82+
"""Extract region from endpoint URL.
83+
84+
:return: Extracted region or None if not found
85+
:rtype: typing.Optional[str]
86+
"""
87+
# Try to extract region from the ingestion endpoint URL
88+
endpoint = self.endpoint
89+
if endpoint:
90+
match = region_from_endpoint_pattern.match(endpoint)
91+
if match:
92+
return match.group(1)
93+
94+
return None
95+
6996
def _validate_instrumentation_key(self) -> None:
7097
"""Validates the instrumentation key used for Azure Monitor.
7198

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
# ONE SETTINGS
8282
_ONE_SETTINGS_PYTHON_KEY = "python"
83+
_ONE_SETTINGS_PYTHON_TARGETING = {"namespaces": _ONE_SETTINGS_PYTHON_KEY}
8384
_ONE_SETTINGS_CHANGE_VERSION_KEY = "CHANGE_VERSION"
8485
_ONE_SETTINGS_CNAME = "https://settings.sdk.monitor.azure.com"
8586
_ONE_SETTINGS_PATH = "/AzMonSDKDynamicConfiguration"
@@ -88,6 +89,11 @@
8889
_ONE_SETTINGS_CHANGE_URL = _ONE_SETTINGS_CNAME + _ONE_SETTINGS_CHANGE_PATH
8990
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS = 3600 # 60 minutes
9091

92+
## ONE SETTINGS CONFIGS
93+
_ONE_SETTINGS_DEFAULT_STATS_CONNECTION_STRING_KEY = "DEFAULT_STATS_CONNECTION_STRING"
94+
_ONE_SETTINGS_SUPPORTED_DATA_BOUNDARIES_KEY = "SUPPORTED_DATA_BOUNDARIES"
95+
_ONE_SETTINGS_FEATURE_LOCAL_STORAGE = "FEATURE_LOCAL_STORAGE"
96+
9197
# Statsbeat
9298
# (OpenTelemetry metric name, Statsbeat metric name)
9399
# Note: OpenTelemetry SDK normalizes metric names to lowercase, so first element should be lowercase

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def __init__(self):
1414
self.call_on_emit = hasattr(super(), 'on_emit')
1515

1616
def on_emit(self, log_data: LogData) -> None: # type: ignore
17-
qpm = _QuickpulseManager._instance
17+
qpm = _QuickpulseManager()
1818
if qpm:
1919
qpm._record_log_record(log_data)
2020
if self.call_on_emit:
@@ -37,7 +37,7 @@ def force_flush(self, timeout_millis: int = 30000):
3737
class _QuickpulseSpanProcessor(SpanProcessor):
3838

3939
def on_end(self, span: ReadableSpan) -> None:
40-
qpm = _QuickpulseManager._instance
40+
qpm = _QuickpulseManager()
4141
if qpm:
4242
qpm._record_span(span)
43-
return super().on_end(span)
43+
return super().on_end(span) # type: ignore

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,15 +379,22 @@ def _get_scope(aad_audience=None):
379379

380380

381381
class Singleton(type):
382-
_instance = None
382+
"""Metaclass for creating thread-safe singleton instances.
383+
384+
Supports multiple singleton classes by maintaining a separate instance
385+
for each class that uses this metaclass.
386+
"""
387+
_instances = {} # type: ignore
383388
_lock = threading.Lock()
384389

385-
def __call__(cls, *args: Any, **kwargs: Any):
386-
if not cls._instance:
390+
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
391+
if cls not in cls._instances:
387392
with cls._lock:
388-
if not cls._instance:
389-
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
390-
return cls._instance
393+
# Double-check pattern to avoid race conditions
394+
if cls not in cls._instances:
395+
instance = super().__call__(*args, **kwargs)
396+
cls._instances[cls] = instance # type: ignore
397+
return cls._instances[cls]
391398

392399
def _get_telemetry_type(item: TelemetryItem):
393400
if hasattr(item, "data") and item.data is not None:

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def __init__(self, **kwargs: Any) -> None:
9999
parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string"))
100100

101101
# TODO: Re-add this once all parts of OneSettings is finished
102+
# Initialize the configuration manager to begin fetching settings
102103
# self._configuration_manager = _ConfigurationManager()
103104

104105
self._api_version = kwargs.get("api_version") or _SERVICE_API_LATEST
@@ -111,6 +112,7 @@ def __init__(self, **kwargs: Any) -> None:
111112
self._consecutive_redirects = 0 # To prevent circular redirects
112113
self._disable_offline_storage = kwargs.get("disable_offline_storage", False)
113114
self._endpoint = parsed_connection_string.endpoint
115+
self._region = parsed_connection_string.region
114116
self._instrumentation_key = parsed_connection_string.instrumentation_key
115117
self._aad_audience = parsed_connection_string.aad_audience
116118
self._storage_maintenance_period = kwargs.get(

0 commit comments

Comments
 (0)