Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed linting issues
  • Loading branch information
nikhilsuri-db committed Sep 30, 2025
commit efbeb1ad1fc61e11cd8ca30a8dbb308e8941b692
6 changes: 5 additions & 1 deletion src/databricks/sql/auth/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ def __init__(
self.pool_connections = pool_connections or 10
self.pool_maxsize = pool_maxsize or 20
self.user_agent = user_agent
self.telemetry_circuit_breaker_enabled = telemetry_circuit_breaker_enabled if telemetry_circuit_breaker_enabled is not None else False
self.telemetry_circuit_breaker_enabled = (
telemetry_circuit_breaker_enabled
if telemetry_circuit_breaker_enabled is not None
else False
)


def get_effective_azure_login_app_id(hostname) -> str:
Expand Down
124 changes: 60 additions & 64 deletions src/databricks/sql/telemetry/circuit_breaker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,195 +33,191 @@

# Logging Message Constants
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
LOG_CIRCUIT_BREAKER_OPENED = "Circuit breaker opened for %s - telemetry requests will be blocked"
LOG_CIRCUIT_BREAKER_CLOSED = "Circuit breaker closed for %s - telemetry requests will be allowed"
LOG_CIRCUIT_BREAKER_HALF_OPEN = "Circuit breaker half-open for %s - testing telemetry requests"
LOG_CIRCUIT_BREAKER_OPENED = (
"Circuit breaker opened for %s - telemetry requests will be blocked"
)
LOG_CIRCUIT_BREAKER_CLOSED = (
"Circuit breaker closed for %s - telemetry requests will be allowed"
)
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
"Circuit breaker half-open for %s - testing telemetry requests"
)


class CircuitBreakerStateListener(CircuitBreakerListener):
"""Listener for circuit breaker state changes."""

def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
"""Called before the circuit breaker calls a function."""
pass

def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
"""Called when a function called by the circuit breaker fails."""
pass

def success(self, cb: CircuitBreaker) -> None:
"""Called when a function called by the circuit breaker succeeds."""
pass

def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
"""Called when the circuit breaker state changes."""
old_state_name = old_state.name if old_state else "None"
new_state_name = new_state.name if new_state else "None"

logger.info(
LOG_CIRCUIT_BREAKER_STATE_CHANGED,
old_state_name, new_state_name, cb.name
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
)

if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
logger.warning(
LOG_CIRCUIT_BREAKER_OPENED,
cb.name
)
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
logger.info(
LOG_CIRCUIT_BREAKER_CLOSED,
cb.name
)
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
logger.info(
LOG_CIRCUIT_BREAKER_HALF_OPEN,
cb.name
)
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)


@dataclass(frozen=True)
class CircuitBreakerConfig:
"""Configuration for circuit breaker behavior.

This class is immutable to prevent modification of circuit breaker settings.
All configuration values are set to constants defined at the module level.
"""

# Failure threshold percentage (0.0 to 1.0)
failure_threshold: float = DEFAULT_FAILURE_THRESHOLD

# Minimum number of calls before circuit can open
minimum_calls: int = DEFAULT_MINIMUM_CALLS

# Time window for counting failures (in seconds)
timeout: int = DEFAULT_TIMEOUT

# Time to wait before trying to close circuit (in seconds)
reset_timeout: int = DEFAULT_RESET_TIMEOUT

# Expected exception types that should trigger circuit breaker
expected_exception: tuple = DEFAULT_EXPECTED_EXCEPTION

# Name for the circuit breaker (for logging)
name: str = DEFAULT_NAME


class CircuitBreakerManager:
"""
Manages circuit breaker instances for telemetry requests.

This class provides a singleton pattern to manage circuit breaker instances
per host, ensuring that telemetry failures don't impact main SQL operations.
"""

_instances: Dict[str, CircuitBreaker] = {}
_lock = threading.RLock()
_config: Optional[CircuitBreakerConfig] = None

@classmethod
def initialize(cls, config: CircuitBreakerConfig) -> None:
"""
Initialize the circuit breaker manager with configuration.

Args:
config: Circuit breaker configuration
"""
with cls._lock:
cls._config = config
logger.debug("CircuitBreakerManager initialized with config: %s", config)

@classmethod
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Get or create a circuit breaker instance for the specified host.

Args:
host: The hostname for which to get the circuit breaker

Returns:
CircuitBreaker instance for the host
"""
if not cls._config:
# Return a no-op circuit breaker if not initialized
return cls._create_noop_circuit_breaker()

with cls._lock:
if host not in cls._instances:
cls._instances[host] = cls._create_circuit_breaker(host)
logger.debug("Created circuit breaker for host: %s", host)

return cls._instances[host]

@classmethod
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Create a new circuit breaker instance for the specified host.

Args:
host: The hostname for the circuit breaker

Returns:
New CircuitBreaker instance
"""
config = cls._config

if config is None:
raise RuntimeError("CircuitBreakerManager not initialized")

# Create circuit breaker with configuration
breaker = CircuitBreaker(
fail_max=config.minimum_calls, # Number of failures before circuit opens
reset_timeout=config.reset_timeout,
name=f"{config.name}-{host}"
name=f"{config.name}-{host}",
)

# Add state change listeners for logging
breaker.add_listener(CircuitBreakerStateListener())

return breaker

@classmethod
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
"""
Create a no-op circuit breaker that always allows calls.

Returns:
CircuitBreaker that never opens
"""
# Create a circuit breaker with very high thresholds so it never opens
breaker = CircuitBreaker(
fail_max=1000000, # Very high threshold
reset_timeout=1, # Short reset time
name="noop-circuit-breaker"
reset_timeout=1, # Short reset time
name="noop-circuit-breaker",
)
breaker.failure_threshold = 1.0 # 100% failure threshold
return breaker



@classmethod
def get_circuit_breaker_state(cls, host: str) -> str:
"""
Get the current state of the circuit breaker for a host.

Args:
host: The hostname

Returns:
Current state of the circuit breaker
"""
if not cls._config:
return CIRCUIT_BREAKER_STATE_DISABLED

with cls._lock:
if host not in cls._instances:
return CIRCUIT_BREAKER_STATE_NOT_INITIALIZED

breaker = cls._instances[host]
return breaker.current_state

@classmethod
def reset_circuit_breaker(cls, host: str) -> None:
"""
Reset the circuit breaker for a host to closed state.

Args:
host: The hostname
"""
Expand All @@ -230,20 +226,20 @@ def reset_circuit_breaker(cls, host: str) -> None:
# pybreaker doesn't have a reset method, we need to recreate the breaker
del cls._instances[host]
logger.info("Reset circuit breaker for host: %s", host)

@classmethod
def clear_circuit_breaker(cls, host: str) -> None:
"""
Remove the circuit breaker instance for a host.

Args:
host: The hostname
"""
with cls._lock:
if host in cls._instances:
del cls._instances[host]
logger.debug("Cleared circuit breaker for host: %s", host)

@classmethod
def clear_all_circuit_breakers(cls) -> None:
"""Clear all circuit breaker instances."""
Expand All @@ -255,10 +251,10 @@ def clear_all_circuit_breakers(cls) -> None:
def is_circuit_breaker_error(exception: Exception) -> bool:
"""
Check if an exception is a circuit breaker error.

Args:
exception: The exception to check

Returns:
True if the exception is a circuit breaker error
"""
Expand Down
38 changes: 23 additions & 15 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
from databricks.sql.telemetry.telemetry_push_client import (
ITelemetryPushClient,
TelemetryPushClient,
CircuitBreakerTelemetryPushClient
CircuitBreakerTelemetryPushClient,
)
from databricks.sql.telemetry.circuit_breaker_manager import (
CircuitBreakerConfig,
is_circuit_breaker_error,
)
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerConfig, is_circuit_breaker_error

if TYPE_CHECKING:
from databricks.sql.client import Connection
Expand Down Expand Up @@ -194,28 +197,32 @@ def __init__(

# Create own HTTP client from client context
self._http_client = UnifiedHttpClient(client_context)

# Create telemetry push client based on circuit breaker enabled flag
if client_context.telemetry_circuit_breaker_enabled:
# Create circuit breaker configuration with hardcoded values
# These values are optimized for telemetry batching and network resilience
circuit_breaker_config = CircuitBreakerConfig(
failure_threshold=0.5, # Opens if 50%+ of calls fail
minimum_calls=20, # Minimum sample size before circuit can open
timeout=30, # Time window for counting failures (seconds)
reset_timeout=30, # Cool-down period before retrying (seconds)
name=f"telemetry-circuit-breaker-{session_id_hex}"
failure_threshold=0.5, # Opens if 50%+ of calls fail
minimum_calls=20, # Minimum sample size before circuit can open
timeout=30, # Time window for counting failures (seconds)
reset_timeout=30, # Cool-down period before retrying (seconds)
name=f"telemetry-circuit-breaker-{session_id_hex}",
)

# Create circuit breaker telemetry push client
self._telemetry_push_client: ITelemetryPushClient = CircuitBreakerTelemetryPushClient(
TelemetryPushClient(self._http_client),
host_url,
circuit_breaker_config
self._telemetry_push_client: ITelemetryPushClient = (
CircuitBreakerTelemetryPushClient(
TelemetryPushClient(self._http_client),
host_url,
circuit_breaker_config,
)
)
else:
# Circuit breaker disabled - use direct telemetry push client
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(self._http_client)
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(
self._http_client
)

def _export_event(self, event):
"""Add an event to the batch queue and flush if batch is full"""
Expand Down Expand Up @@ -290,7 +297,8 @@ def _send_with_unified_client(self, url, data, headers, timeout=900):
if is_circuit_breaker_error(e):
logger.warning(
"Telemetry request blocked by circuit breaker for connection %s: %s",
self._session_id_hex, e
self._session_id_hex,
e,
)
else:
logger.error("Failed to send telemetry: %s", e)
Expand Down
Loading