-
Notifications
You must be signed in to change notification settings - Fork 0
/
telemetry.py
78 lines (65 loc) · 3.12 KB
/
telemetry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import uuid
import logging
logger = logging.getLogger(__name__)
CLIENT_REQUEST_ID = 'client-request-id'
CLIENT_CURRENT_TELEMETRY = "x-client-current-telemetry"
CLIENT_LAST_TELEMETRY = "x-client-last-telemetry"
NON_SILENT_CALL = 0
FORCE_REFRESH = 1
AT_ABSENT = 2
AT_EXPIRED = 3
AT_AGING = 4
RESERVED = 5
def _get_new_correlation_id():
return str(uuid.uuid4())
class _TelemetryContext(object):
"""It is used for handling the telemetry context for current OAuth2 "exchange"."""
# https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=%2FTelemetry%2FMSALServerSideTelemetry.md&_a=preview
_SUCCEEDED = "succeeded"
_FAILED = "failed"
_FAILURE_SIZE = "failure_size"
_CURRENT_HEADER_SIZE_LIMIT = 100
_LAST_HEADER_SIZE_LIMIT = 350
def __init__(self, buffer, lock, api_id, correlation_id=None, refresh_reason=None):
self._buffer = buffer
self._lock = lock
self._api_id = api_id
self._correlation_id = correlation_id or _get_new_correlation_id()
self._refresh_reason = refresh_reason or NON_SILENT_CALL
logger.debug("Generate or reuse correlation_id: %s", self._correlation_id)
def generate_headers(self):
with self._lock:
current = "4|{api_id},{cache_refresh}|".format(
api_id=self._api_id, cache_refresh=self._refresh_reason)
if len(current) > self._CURRENT_HEADER_SIZE_LIMIT:
logger.warning(
"Telemetry header greater than {} will be truncated by AAD".format(
self._CURRENT_HEADER_SIZE_LIMIT))
failures = self._buffer.get(self._FAILED, [])
return {
CLIENT_REQUEST_ID: self._correlation_id,
CLIENT_CURRENT_TELEMETRY: current,
CLIENT_LAST_TELEMETRY: "4|{succeeded}|{failed_requests}|{errors}|".format(
succeeded=self._buffer.get(self._SUCCEEDED, 0),
failed_requests=",".join("{a},{c}".format(**f) for f in failures),
errors=",".join(f["e"] for f in failures),
)
}
def hit_an_access_token(self):
with self._lock:
self._buffer[self._SUCCEEDED] = self._buffer.get(self._SUCCEEDED, 0) + 1
def update_telemetry(self, auth_result):
if auth_result:
with self._lock:
if "error" in auth_result:
self._record_failure(auth_result["error"])
else: # Telemetry sent successfully. Reset buffer
self._buffer.clear() # This won't work: self._buffer = {}
def _record_failure(self, error):
simulation = len(",{api_id},{correlation_id},{error}".format(
api_id=self._api_id, correlation_id=self._correlation_id, error=error))
if self._buffer.get(self._FAILURE_SIZE, 0) + simulation < self._LAST_HEADER_SIZE_LIMIT:
self._buffer[self._FAILURE_SIZE] = self._buffer.get(
self._FAILURE_SIZE, 0) + simulation
self._buffer.setdefault(self._FAILED, []).append({
"a": self._api_id, "c": self._correlation_id, "e": error})