52
52
from azure .monitor .opentelemetry .exporter .statsbeat ._state import (
53
53
get_statsbeat_initial_success ,
54
54
get_statsbeat_shutdown ,
55
- get_customer_sdkstats_shutdown ,
56
55
increment_and_check_statsbeat_failure_count ,
57
56
is_statsbeat_enabled ,
58
57
set_statsbeat_initial_success ,
59
- is_customer_sdkstats_enabled ,
60
58
)
61
59
from azure .monitor .opentelemetry .exporter .statsbeat ._utils import (
62
60
_update_requests_map ,
63
- _track_dropped_items_from_storage ,
64
- _track_dropped_items ,
65
- _track_retry_items ,
66
- _track_successful_items ,
61
+ )
62
+ from azure .monitor .opentelemetry .exporter .statsbeat .customer ._utils import (
63
+ track_dropped_items_from_storage ,
64
+ track_dropped_items ,
65
+ track_retry_items ,
66
+ track_successful_items ,
67
+ )
68
+ from azure .monitor .opentelemetry .exporter .statsbeat .customer ._state import (
69
+ get_customer_stats_manager ,
67
70
)
68
71
69
72
@@ -111,6 +114,7 @@ def __init__(self, **kwargs: Any) -> None:
111
114
self ._credential = _get_authentication_credential (** kwargs )
112
115
self ._consecutive_redirects = 0 # To prevent circular redirects
113
116
self ._disable_offline_storage = kwargs .get ("disable_offline_storage" , False )
117
+ self ._connection_string = parsed_connection_string ._connection_string
114
118
self ._endpoint = parsed_connection_string .endpoint
115
119
self ._region = parsed_connection_string .region
116
120
self ._instrumentation_key = parsed_connection_string .instrumentation_key
@@ -159,10 +163,10 @@ def __init__(self, **kwargs: Any) -> None:
159
163
config .http_logging_policy or HttpLoggingPolicy (** kwargs ),
160
164
]
161
165
162
- self .client = AzureMonitorClient (
166
+ self .client : AzureMonitorClient = AzureMonitorClient (
163
167
host = self ._endpoint , connection_timeout = self ._timeout , policies = policies , ** kwargs
164
168
)
165
- self .storage = None
169
+ self .storage : Optional [ LocalFileStorage ] = None
166
170
if not self ._disable_offline_storage :
167
171
self .storage = LocalFileStorage ( # pyright: ignore
168
172
path = self ._storage_directory , # type: ignore
@@ -187,7 +191,7 @@ def __init__(self, **kwargs: Any) -> None:
187
191
# customer sdkstats initialization
188
192
if self ._should_collect_customer_sdkstats ():
189
193
190
- from azure .monitor .opentelemetry .exporter .statsbeat ._customer_sdkstats import collect_customer_sdkstats
194
+ from azure .monitor .opentelemetry .exporter .statsbeat .customer import collect_customer_sdkstats
191
195
# Collect customer sdkstats metrics
192
196
collect_customer_sdkstats (self )
193
197
@@ -212,15 +216,15 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
212
216
envelopes_to_store = [x .as_dict () for x in envelopes ]
213
217
result_from_storage_put = self .storage .put (envelopes_to_store )
214
218
if self ._should_collect_customer_sdkstats ():
215
- _track_dropped_items_from_storage (result_from_storage_put , envelopes )
219
+ track_dropped_items_from_storage (result_from_storage_put , envelopes )
216
220
elif result == ExportResult .SUCCESS :
217
221
# Try to send any cached events
218
222
self ._transmit_from_storage ()
219
223
220
224
else :
221
225
# Track items that would have been retried but are dropped since client has local storage disabled
222
226
if self ._should_collect_customer_sdkstats ():
223
- _track_dropped_items (envelopes , DropCode .CLIENT_STORAGE_DISABLED )
227
+ track_dropped_items (envelopes , DropCode .CLIENT_STORAGE_DISABLED )
224
228
225
229
# pylint: disable=too-many-branches
226
230
# pylint: disable=too-many-nested-blocks
@@ -259,7 +263,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
259
263
260
264
# Track successful items in customer sdkstats
261
265
if self ._should_collect_customer_sdkstats ():
262
- _track_successful_items (envelopes )
266
+ track_successful_items (envelopes )
263
267
else : # 206
264
268
reach_ingestion = True
265
269
resend_envelopes = []
@@ -268,13 +272,13 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
268
272
resend_envelopes .append (envelopes [error .index ]) # type: ignore
269
273
# Track retried items in customer sdkstats
270
274
if self ._should_collect_customer_sdkstats ():
271
- _track_retry_items (resend_envelopes , error )
275
+ track_retry_items (resend_envelopes , error )
272
276
else :
273
277
if not self ._is_stats_exporter ():
274
278
# Track dropped items in customer sdkstats, non-retryable scenario
275
279
if self ._should_collect_customer_sdkstats ():
276
280
if error is not None and hasattr (error , "index" ) and error .index is not None and isinstance (error .status_code , int ):
277
- _track_dropped_items ([envelopes [error .index ]], error .status_code )
281
+ track_dropped_items ([envelopes [error .index ]], error .status_code )
278
282
logger .error (
279
283
"Data drop %s: %s %s." ,
280
284
error .status_code ,
@@ -285,12 +289,12 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
285
289
envelopes_to_store = [x .as_dict () for x in resend_envelopes ]
286
290
result_from_storage = self .storage .put (envelopes_to_store , 0 )
287
291
if self ._should_collect_customer_sdkstats ():
288
- _track_dropped_items_from_storage (result_from_storage , resend_envelopes )
292
+ track_dropped_items_from_storage (result_from_storage , resend_envelopes )
289
293
self ._consecutive_redirects = 0
290
294
elif resend_envelopes :
291
295
# Track items that would have been retried but are dropped since client has local storage disabled
292
296
if self ._should_collect_customer_sdkstats ():
293
- _track_dropped_items (resend_envelopes , DropCode .CLIENT_STORAGE_DISABLED )
297
+ track_dropped_items (resend_envelopes , DropCode .CLIENT_STORAGE_DISABLED )
294
298
# Mark as not retryable because we already write to storage here
295
299
result = ExportResult .FAILED_NOT_RETRYABLE
296
300
except HttpResponseError as response_error :
@@ -304,7 +308,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
304
308
# Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting
305
309
if not self ._is_stats_exporter ():
306
310
if self ._should_collect_customer_sdkstats ():
307
- _track_retry_items (envelopes , response_error )
311
+ track_retry_items (envelopes , response_error )
308
312
if response_error .status_code == 401 :
309
313
logger .error (
310
314
"Retryable server side error: %s. " \
@@ -313,8 +317,6 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
313
317
response_error .message ,
314
318
)
315
319
elif response_error .status_code == 403 :
316
- if self ._should_collect_customer_sdkstats ():
317
- _track_retry_items (envelopes , response_error )
318
320
logger .error (
319
321
"Retryable server side error: %s. " \
320
322
"Your application may be configured with a token credential " \
@@ -330,7 +332,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
330
332
331
333
if not self ._is_stats_exporter ():
332
334
if self ._should_collect_customer_sdkstats () and isinstance (response_error .status_code , int ):
333
- _track_dropped_items (envelopes , response_error .status_code )
335
+ track_dropped_items (envelopes , response_error .status_code )
334
336
elif _is_redirect_code (response_error .status_code ):
335
337
self ._consecutive_redirects = self ._consecutive_redirects + 1
336
338
# pylint: disable=W0212
@@ -349,7 +351,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
349
351
else :
350
352
if not self ._is_stats_exporter ():
351
353
if self ._should_collect_customer_sdkstats ():
352
- _track_dropped_items (envelopes , DropCode .CLIENT_EXCEPTION , _exception_categories .CLIENT_EXCEPTION .value )
354
+ track_dropped_items (envelopes , DropCode .CLIENT_EXCEPTION , _exception_categories .CLIENT_EXCEPTION .value )
353
355
logger .error (
354
356
"Error parsing redirect information." ,
355
357
)
@@ -358,7 +360,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
358
360
if not self ._is_stats_exporter ():
359
361
# Track dropped items in customer sdkstats, non-retryable scenario
360
362
if self ._should_collect_customer_sdkstats ():
361
- _track_dropped_items (
363
+ track_dropped_items (
362
364
envelopes ,
363
365
DropCode .CLIENT_EXCEPTION ,
364
366
_exception_categories .CLIENT_EXCEPTION .value
@@ -384,14 +386,14 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
384
386
)
385
387
# Track dropped items in customer sdkstats, non-retryable scenario
386
388
if self ._should_collect_customer_sdkstats () and isinstance (response_error .status_code , int ):
387
- _track_dropped_items (envelopes , response_error .status_code )
389
+ track_dropped_items (envelopes , response_error .status_code )
388
390
if _is_invalid_code (response_error .status_code ):
389
391
# Shutdown statsbeat on invalid code from customer endpoint
390
392
# Import here to avoid circular dependencies
391
393
from azure .monitor .opentelemetry .exporter .statsbeat ._statsbeat import (
392
394
shutdown_statsbeat_metrics ,
393
395
)
394
- from azure .monitor .opentelemetry .exporter .statsbeat ._customer_sdkstats import (
396
+ from azure .monitor .opentelemetry .exporter .statsbeat .customer import (
395
397
shutdown_customer_sdkstats_metrics ,
396
398
)
397
399
@@ -407,7 +409,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
407
409
408
410
# Track retry items in customer sdkstats for client-side exceptions
409
411
if self ._should_collect_customer_sdkstats ():
410
- _track_retry_items (envelopes , request_error )
412
+ track_retry_items (envelopes , request_error )
411
413
412
414
if self ._should_collect_stats ():
413
415
exc_type = request_error .exc_type
@@ -420,7 +422,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
420
422
421
423
# Track dropped items in customer sdkstats for general exceptions
422
424
if self ._should_collect_customer_sdkstats ():
423
- _track_dropped_items (envelopes , DropCode .CLIENT_EXCEPTION , _exception_categories .CLIENT_EXCEPTION .value )
425
+ track_dropped_items (envelopes , DropCode .CLIENT_EXCEPTION , _exception_categories .CLIENT_EXCEPTION .value )
424
426
425
427
if self ._should_collect_stats ():
426
428
_update_requests_map (_REQ_EXCEPTION_NAME [1 ], value = ex .__class__ .__name__ )
@@ -467,10 +469,10 @@ def _should_collect_stats(self):
467
469
468
470
# check to see whether its the case of customer sdkstats collection
469
471
def _should_collect_customer_sdkstats (self ):
470
- # Don't collect customer sdkstats for instrumentation collection, sdkstats exporter or customer sdkstats exporter
472
+ manager = get_customer_stats_manager ()
471
473
return (
472
- is_customer_sdkstats_enabled ()
473
- and not get_customer_sdkstats_shutdown ()
474
+ manager . is_enabled
475
+ and not manager . is_shutdown
474
476
and not self ._is_stats_exporter ()
475
477
and not self ._is_customer_sdkstats_exporter ()
476
478
and not self ._instrumentation_collection
0 commit comments