Skip to content

Commit 63593a6

Browse files
committed
added classes required for telemetry
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 6331fc1 commit 63593a6

25 files changed

+404
-1
lines changed

examples/query_execute.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
) as connection:
99

1010
with connection.cursor() as cursor:
11-
cursor.execute("SELECT * FROM default.diamonds LIMIT 2")
11+
cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1")
1212
result = cursor.fetchall()
1313

1414
for row in result:

examples/test_telemetry.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
import databricks.sql as sql
3+
4+
# Create connection with telemetry enabled
5+
conn = sql.connect(
6+
server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
7+
http_path=os.environ["DATABRICKS_HTTP_PATH"],
8+
access_token=os.environ["DATABRICKS_TOKEN"],
9+
enable_telemetry=True, # Enable telemetry
10+
telemetry_batch_size=1 # Set batch size to 1
11+
)
12+
13+
# Execute a simple query to generate telemetry
14+
cursor = conn.cursor()
15+
cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1")
16+
cursor.fetchall()
17+
18+
# Close the connection
19+
cursor.close()
20+
conn.close()

src/databricks/sql/client.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
import time
22
from typing import Dict, Tuple, List, Optional, Any, Union, Sequence
3+
import uuid
4+
from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent
5+
from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration
6+
from databricks.sql.telemetry.TelemetryClient import TelemetryClient
7+
from databricks.sql.telemetry.NoopTelemetryClient import NoopTelemetryClient
8+
from databricks.sql.telemetry.TelemetryFrontendLog import TelemetryFrontendLog
9+
from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext
10+
from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext
11+
from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry
12+
from databricks.sql.auth.auth import AuthType
13+
from databricks.sql.auth.authenticators import (
14+
DatabricksOAuthProvider,
15+
ExternalAuthProvider,
16+
AuthProvider,
17+
AccessTokenAuthProvider,
18+
)
319

420
import pandas
521

@@ -234,6 +250,32 @@ def read(self) -> Optional[OAuthToken]:
234250
server_hostname, **kwargs
235251
)
236252

253+
self.server_telemetry_enabled = True
254+
self.client_telemetry_enabled = kwargs.get("enable_telemetry", False)
255+
self.telemetry_enabled = (
256+
self.client_telemetry_enabled and self.server_telemetry_enabled
257+
)
258+
telemetry_batch_size = kwargs.get("telemetry_batch_size", 200)
259+
260+
if self.telemetry_enabled:
261+
self.telemetry_client = TelemetryClient(
262+
host=self.host,
263+
connection_uuid="test-connection-uuid",
264+
auth_provider=auth_provider,
265+
is_authenticated=(
266+
isinstance(auth_provider, AccessTokenAuthProvider)
267+
or isinstance(auth_provider, DatabricksOAuthProvider)
268+
or isinstance(auth_provider, ExternalAuthProvider)
269+
or (
270+
isinstance(auth_provider, AuthProvider)
271+
and hasattr(auth_provider, "_header_factory")
272+
)
273+
),
274+
batch_size=telemetry_batch_size,
275+
)
276+
else:
277+
self.telemetry_client = NoopTelemetryClient()
278+
237279
user_agent_entry = kwargs.get("user_agent_entry")
238280
if user_agent_entry is None:
239281
user_agent_entry = kwargs.get("_user_agent_entry")
@@ -419,6 +461,9 @@ def _close(self, close_cursors=True) -> None:
419461

420462
self.open = False
421463

464+
if hasattr(self, "telemetry_client"):
465+
self.telemetry_client.close()
466+
422467
def commit(self):
423468
"""No-op because Databricks does not support transactions"""
424469
pass
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.HostDetails import HostDetails
4+
from databricks.sql.telemetry.enums.AuthMech import AuthMech
5+
from databricks.sql.telemetry.enums.AuthFlow import AuthFlow
6+
from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType
7+
8+
9+
@dataclass
10+
class DriverConnectionParameters:
11+
http_path: str
12+
driver_mode: DatabricksClientType
13+
host_details: HostDetails
14+
auth_mech: AuthMech
15+
auth_flow: AuthFlow
16+
auth_scope: str
17+
discovery_url: str
18+
allowed_volume_ingestion_paths: str
19+
enable_complex_datatype_support: bool
20+
azure_tenant_id: str
21+
socket_timeout: int
22+
23+
def to_json(self):
24+
return json.dumps(asdict(self))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
4+
5+
@dataclass
6+
class DriverErrorInfo:
7+
error_name: str
8+
stack_trace: str
9+
10+
def to_json(self):
11+
return json.dumps(asdict(self))
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
import platform
4+
import sys
5+
import locale
6+
from databricks.sql import __version__
7+
8+
9+
@dataclass
10+
class DriverSystemConfiguration:
11+
driver_version: str
12+
os_name: str
13+
os_version: str
14+
os_arch: str
15+
runtime_name: str
16+
runtime_version: str
17+
runtime_vendor: str
18+
client_app_name: str
19+
locale_name: str
20+
driver_name: str
21+
char_set_encoding: str
22+
23+
def __init__(self):
24+
self.driver_version = __version__
25+
self.os_name = platform.system()
26+
self.os_version = platform.version()
27+
self.os_arch = platform.machine()
28+
self.runtime_name = platform.python_implementation()
29+
self.runtime_version = platform.python_version()
30+
self.runtime_vendor = sys.implementation.name
31+
self.client_app_name = "databricks-sql-python"
32+
self.locale_name = locale.getdefaultlocale()[0]
33+
self.driver_name = "databricks-sql-python"
34+
self.char_set_encoding = "UTF-8"
35+
36+
def to_json(self):
37+
return json.dumps(asdict(self))
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.enums.DriverVolumeOperationType import (
4+
DriverVolumeOperationType,
5+
)
6+
7+
8+
@dataclass
9+
class DriverVolumeOperation:
10+
volume_operation_type: DriverVolumeOperationType
11+
volume_path: str
12+
13+
def to_json(self):
14+
return json.dumps(asdict(self))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext
4+
5+
6+
@dataclass
7+
class FrontendLogContext:
8+
client_context: TelemetryClientContext
9+
10+
def to_json(self):
11+
return json.dumps(asdict(self))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent
4+
5+
6+
@dataclass
7+
class FrontendLogEntry:
8+
sql_driver_log: TelemetryEvent
9+
10+
def to_json(self):
11+
return json.dumps(asdict(self))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
4+
5+
@dataclass
6+
class HostDetails:
7+
host_url: str
8+
port: int
9+
10+
def to_json(self):
11+
return json.dumps(asdict(self))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
class NoopTelemetryClient:
2+
# A no-operation telemetry client that implements the same interface but does nothing
3+
4+
def export_event(self, event):
5+
pass
6+
7+
def flush(self):
8+
pass
9+
10+
def close(self):
11+
pass
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.enums.StatementType import StatementType
4+
from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat
5+
6+
7+
@dataclass
8+
class SqlExecutionEvent:
9+
statement_type: StatementType
10+
is_compressed: bool
11+
execution_result: ExecutionResultFormat
12+
retry_count: int
13+
14+
def to_json(self):
15+
return json.dumps(asdict(self))
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import threading
2+
import time
3+
import json
4+
import requests
5+
from concurrent.futures import ThreadPoolExecutor
6+
7+
8+
class TelemetryClient:
9+
def __init__(
10+
self,
11+
host,
12+
connection_uuid,
13+
auth_provider=None,
14+
is_authenticated=False,
15+
batch_size=200,
16+
):
17+
self.host = host
18+
self.connection_uuid = connection_uuid
19+
self.auth_provider = auth_provider
20+
self.is_authenticated = is_authenticated
21+
self.batch_size = batch_size
22+
self.events_batch = []
23+
self.lock = threading.Lock()
24+
self.executor = ThreadPoolExecutor(
25+
max_workers=5
26+
) # Thread pool for async operations
27+
self.DriverConnectionParameters = None
28+
29+
def export_event(self, event):
30+
pass
31+
32+
def flush(self):
33+
pass
34+
35+
def close(self):
36+
pass
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from dataclasses import dataclass, asdict
2+
import json
3+
4+
5+
@dataclass
6+
class TelemetryClientContext:
7+
timestamp_millis: int
8+
user_agent: str
9+
10+
def to_json(self):
11+
return json.dumps(asdict(self))
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration
4+
from databricks.sql.telemetry.DriverConnectionParameters import (
5+
DriverConnectionParameters,
6+
)
7+
from databricks.sql.telemetry.DriverVolumeOperation import DriverVolumeOperation
8+
from databricks.sql.telemetry.SqlExecutionEvent import SqlExecutionEvent
9+
from databricks.sql.telemetry.DriverErrorInfo import DriverErrorInfo
10+
11+
12+
@dataclass
13+
class TelemetryEvent:
14+
session_id: str
15+
sql_statement_id: str
16+
system_configuration: DriverSystemConfiguration
17+
driver_connection_params: DriverConnectionParameters
18+
auth_type: str
19+
vol_operation: DriverVolumeOperation
20+
sql_operation: SqlExecutionEvent
21+
error_info: DriverErrorInfo
22+
latency: int
23+
24+
def to_json(self):
25+
return json.dumps(asdict(self))
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext
4+
from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry
5+
6+
7+
@dataclass
8+
class TelemetryFrontendLog:
9+
workspace_id: int
10+
frontend_log_event_id: str
11+
context: FrontendLogContext
12+
entry: FrontendLogEntry
13+
14+
def to_json(self):
15+
return json.dumps(asdict(self))
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import platform
2+
import sys
3+
import uuid
4+
import time
5+
from typing import Optional
6+
7+
from databricks.sql import __version__
8+
from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration
9+
10+
11+
class TelemetryHelper:
12+
13+
# Singleton instance of DriverSystemConfiguration
14+
_DRIVER_SYSTEM_CONFIGURATION = None
15+
16+
@classmethod
17+
def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration:
18+
if cls._DRIVER_SYSTEM_CONFIGURATION is None:
19+
cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration(
20+
driverName="Databricks SQL Python Connector",
21+
driverVersion=__version__,
22+
runtimeName=f"Python {sys.version.split()[0]}",
23+
runtimeVendor=platform.python_implementation(),
24+
runtimeVersion=platform.python_version(),
25+
osName=platform.system(),
26+
osVersion=platform.release(),
27+
osArch=platform.machine(),
28+
clientAppName=None,
29+
localeName=f"{platform.system()}_{platform.release()}",
30+
charSetEncoding=sys.getdefaultencoding(),
31+
)
32+
return cls._DRIVER_SYSTEM_CONFIGURATION
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import json
2+
from dataclasses import dataclass, asdict
3+
from typing import List, Optional
4+
5+
6+
@dataclass
7+
class TelemetryRequest:
8+
uploadTime: int
9+
items: List[str]
10+
protoLogs: Optional[List[str]]
11+
12+
def to_json(self):
13+
return json.dumps(asdict(self))

0 commit comments

Comments
 (0)