Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 112 additions & 5 deletions google/cloud/alloydbconnector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import asyncio
import logging
import time
from types import TracebackType
from typing import TYPE_CHECKING
from typing import Any
from typing import Optional
import uuid

import google.auth
from google.auth.credentials import with_scopes_if_required
Expand All @@ -30,10 +32,21 @@
from google.cloud.alloydbconnector.enums import RefreshStrategy
from google.cloud.alloydbconnector.exceptions import ClosedConnectorError
from google.cloud.alloydbconnector.instance import RefreshAheadCache
from google.cloud.alloydbconnector.instance import _parse_instance_uri
from google.cloud.alloydbconnector.lazy import LazyRefreshCache
from google.cloud.alloydbconnector.telemetry import DIAL_CACHE_ERROR
from google.cloud.alloydbconnector.telemetry import DIAL_SUCCESS
from google.cloud.alloydbconnector.telemetry import DIAL_TCP_ERROR
from google.cloud.alloydbconnector.telemetry import REFRESH_AHEAD_TYPE
from google.cloud.alloydbconnector.telemetry import REFRESH_LAZY_TYPE
from google.cloud.alloydbconnector.telemetry import MetricRecorderType
from google.cloud.alloydbconnector.telemetry import TelemetryAttributes
from google.cloud.alloydbconnector.telemetry import TelemetryProviderType
from google.cloud.alloydbconnector.telemetry import new_telemetry_provider
from google.cloud.alloydbconnector.types import CacheTypes
from google.cloud.alloydbconnector.utils import generate_keys
from google.cloud.alloydbconnector.utils import strip_http_prefix
from google.cloud.alloydbconnector.version import __version__

if TYPE_CHECKING:
from google.auth.credentials import Credentials
Expand Down Expand Up @@ -72,6 +85,12 @@ class AsyncConnector:
of the following: RefreshStrategy.LAZY ("LAZY") or
RefreshStrategy.BACKGROUND ("BACKGROUND").
Default: RefreshStrategy.BACKGROUND
enable_builtin_telemetry (bool): Enable built-in telemetry that
reports connector metrics to the
alloydb.googleapis.com/client/connector metric prefix in
Cloud Monitoring. These metrics help AlloyDB improve performance
and identify client connectivity problems. Set to False to
disable. Default: True.
"""

def __init__(
Expand All @@ -84,6 +103,7 @@ def __init__(
ip_type: str | IPTypes = IPTypes.PRIVATE,
user_agent: Optional[str] = None,
refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND,
enable_builtin_telemetry: bool = True,
) -> None:
self._cache: dict[str, CacheTypes] = {}
# initialize default params
Expand Down Expand Up @@ -132,6 +152,55 @@ def __init__(
pass
self._client: Optional[AlloyDBClient] = None
self._closed = False
# built-in telemetry
self._enable_builtin_telemetry = enable_builtin_telemetry
self._client_uid = str(uuid.uuid4())
self._metric_recorders: dict[str, MetricRecorderType] = {}
self._telemetry_provider: Optional[TelemetryProviderType] = None
self._monitoring_client: Optional[object] = None
if self._enable_builtin_telemetry:
try:
from google.cloud.monitoring_v3 import MetricServiceClient

self._monitoring_client = MetricServiceClient(
credentials=self._credentials
)
except Exception as e:
logger.debug(f"Built-in metrics exporter failed to initialize: {e}")

def _get_telemetry_provider(self, project_id: str) -> TelemetryProviderType:
"""Get or lazily create the TelemetryProvider on first connect."""
if self._telemetry_provider is not None:
return self._telemetry_provider
self._telemetry_provider = new_telemetry_provider(
enabled=self._enable_builtin_telemetry,
project_id=project_id,
client_uid=self._client_uid,
version=__version__,
monitoring_client=self._monitoring_client,
)
return self._telemetry_provider

def _metric_recorder(
self,
instance_uri: str,
project: str,
region: str,
cluster: str,
name: str,
) -> MetricRecorderType:
"""Get or lazily create a MetricRecorder for the given instance."""
if instance_uri in self._metric_recorders:
return self._metric_recorders[instance_uri]
provider = self._get_telemetry_provider(project)
mr = provider.create_metric_recorder(
project_id=project,
location=region,
cluster=cluster,
instance=name,
)
self._metric_recorders[instance_uri] = mr
return mr

async def connect(
self,
Expand Down Expand Up @@ -175,20 +244,36 @@ async def connect(

enable_iam_auth = kwargs.pop("enable_iam_auth", self._enable_iam_auth)

# parse instance URI for telemetry resource labels
project, region, cluster, name = _parse_instance_uri(instance_uri)
mr = self._metric_recorder(instance_uri, project, region, cluster, name)

attrs = TelemetryAttributes(
iam_authn=enable_iam_auth,
refresh_type=(
REFRESH_LAZY_TYPE
if self._refresh_strategy == RefreshStrategy.LAZY
else REFRESH_AHEAD_TYPE
),
)
start_time = time.monotonic()

# use existing connection info if possible
if instance_uri in self._cache:
cache_hit = instance_uri in self._cache
attrs.cache_hit = cache_hit
if cache_hit:
cache = self._cache[instance_uri]
else:
if self._refresh_strategy == RefreshStrategy.LAZY:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to lazy refresh"
)
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
cache = LazyRefreshCache(instance_uri, self._client, self._keys, mr)
else:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to background refresh"
)
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
cache = RefreshAheadCache(instance_uri, self._client, self._keys, mr)
self._cache[instance_uri] = cache
logger.debug(f"['{instance_uri}']: Connection info added to cache")

Expand Down Expand Up @@ -218,6 +303,8 @@ async def connect(
except Exception:
# with an error from AlloyDB API call or IP type, invalidate the
# cache and re-raise the error
attrs.dial_status = DIAL_CACHE_ERROR
mr.record_dial_count(attrs)
await self._remove_cached(instance_uri)
raise
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")
Expand All @@ -235,14 +322,24 @@ async def get_authentication_token() -> str:
if enable_iam_auth:
kwargs["password"] = get_authentication_token
try:
return await connector(
conn = await connector(
ip_address, await conn_info.create_ssl_context(), **kwargs
)
except Exception:
# we attempt a force refresh, then throw the error
# The Async connector doesn't distinguish between TCP, TLS, or MDX
# errors. So treat all errors as TCP errors.
attrs.dial_status = DIAL_TCP_ERROR
mr.record_dial_count(attrs)
await cache.force_refresh()
raise

# record successful dial metrics
attrs.dial_status = DIAL_SUCCESS
latency_ms = (time.monotonic() - start_time) * 1000
mr.record_dial_count(attrs)
mr.record_dial_latency(latency_ms)
return conn

async def _remove_cached(self, instance_uri: str) -> None:
"""Stops all background refreshes and deletes the connection
info cache from the map of caches.
Expand All @@ -269,4 +366,14 @@ async def close(self) -> None:
"""Helper function to cancel RefreshAheadCaches' tasks
and close client."""
await asyncio.gather(*[cache.close() for cache in self._cache.values()])
# shut down telemetry provider in executor to avoid blocking the
# event loop (shutdown triggers a final gRPC export)
if self._telemetry_provider is not None:
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(
None, self._telemetry_provider.shutdown
)
except Exception:
pass
self._closed = True
Loading
Loading