Skip to content

Commit

Permalink
feat(spanner): add resource based routing implementation (#10183)
Browse files Browse the repository at this point in the history
* feat(spanner): implement resource routing

* corrected warning message as per the PR comment

* Update spanner/google/cloud/spanner_v1/database.py

Add comma to warning message

Co-Authored-By: skuruppu <skuruppu@google.com>

Co-authored-by: skuruppu <skuruppu@google.com>
  • Loading branch information
larkee and skuruppu authored Jan 31, 2020
1 parent ff4409f commit e072d5d
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 7 deletions.
9 changes: 8 additions & 1 deletion google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import warnings

from google.api_core.gapic_v1 import client_info
import google.api_core.client_options

# pylint: disable=line-too-long
from google.cloud.spanner_admin_database_v1.gapic.database_admin_client import ( # noqa
Expand Down Expand Up @@ -122,6 +123,7 @@ class Client(ClientWithProject):

_instance_admin_api = None
_database_admin_api = None
_endpoint_cache = {}
user_agent = None
_SET_PROJECT = True # Used by from_service_account_json()

Expand All @@ -143,7 +145,12 @@ def __init__(
project=project, credentials=credentials, _http=None
)
self._client_info = client_info
self._client_options = client_options
if client_options and type(client_options) == dict:
self._client_options = google.api_core.client_options.from_dict(
client_options
)
else:
self._client_options = client_options

if user_agent is not None:
warnings.warn(_USER_AGENT_DEPRECATED, DeprecationWarning, stacklevel=2)
Expand Down
47 changes: 47 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import copy
import functools
import os
import re
import threading
import warnings

from google.api_core.client_options import ClientOptions
import google.auth.credentials
from google.protobuf.struct_pb2 import Struct
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import PermissionDenied
import six

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -54,6 +58,19 @@
)


_RESOURCE_ROUTING_PERMISSIONS_WARNING = (
"The client library attempted to connect to an endpoint closer to your Cloud Spanner data "
"but was unable to do so. The client library will fall back and route requests to the endpoint "
"given in the client options, which may result in increased latency. "
"We recommend including the scope https://www.googleapis.com/auth/spanner.admin so that the "
"client library can get an instance-specific endpoint and efficiently route requests."
)


class ResourceRoutingPermissionsWarning(Warning):
pass


class Database(object):
"""Representation of a Cloud Spanner Database.
Expand Down Expand Up @@ -178,6 +195,36 @@ def spanner_api(self):
credentials = credentials.with_scopes((SPANNER_DATA_SCOPE,))
client_info = self._instance._client._client_info
client_options = self._instance._client._client_options
if (
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING")
== "true"
):
endpoint_cache = self._instance._client._endpoint_cache
if self._instance.name in endpoint_cache:
client_options = ClientOptions(
api_endpoint=endpoint_cache[self._instance.name]
)
else:
try:
api = self._instance._client.instance_admin_api
resp = api.get_instance(
self._instance.name,
field_mask={"paths": ["endpoint_uris"]},
metadata=_metadata_with_prefix(self.name),
)
endpoints = resp.endpoint_uris
if endpoints:
endpoint_cache[self._instance.name] = list(endpoints)[0]
client_options = ClientOptions(
api_endpoint=endpoint_cache[self._instance.name]
)
# If there are no endpoints, use default endpoint.
except PermissionDenied:
warnings.warn(
_RESOURCE_ROUTING_PERMISSIONS_WARNING,
ResourceRoutingPermissionsWarning,
stacklevel=2,
)
self._spanner_api = SpannerClient(
credentials=credentials,
client_info=client_info,
Expand Down
58 changes: 58 additions & 0 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@


CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None
USE_RESOURCE_ROUTING = (
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true"
)

if CREATE_INSTANCE:
INSTANCE_ID = "google-cloud" + unique_resource_id("-")
Expand Down Expand Up @@ -282,6 +285,61 @@ def tearDown(self):
for doomed in self.to_delete:
doomed.drop()

@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
def test_spanner_api_use_user_specified_endpoint(self):
# Clear cache.
Client._endpoint_cache = {}
api = Config.CLIENT.instance_admin_api
resp = api.get_instance(
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
)
if not resp or not resp.endpoint_uris:
return # no resolved endpoint.
resolved_endpoint = resp.endpoint_uris[0]

client = Client(client_options={"api_endpoint": resolved_endpoint})

instance = client.instance(Config.INSTANCE.instance_id)
temp_db_id = "temp_db" + unique_resource_id("_")
temp_db = instance.database(temp_db_id)
temp_db.spanner_api

# No endpoint cache - Default endpoint used.
self.assertEqual(client._endpoint_cache, {})

@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
def test_spanner_api_use_resolved_endpoint(self):
# Clear cache.
Client._endpoint_cache = {}
api = Config.CLIENT.instance_admin_api
resp = api.get_instance(
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
)
if not resp or not resp.endpoint_uris:
return # no resolved endpoint.
resolved_endpoint = resp.endpoint_uris[0]

client = Client(
client_options=Config.CLIENT._client_options
) # Use same endpoint as main client.

instance = client.instance(Config.INSTANCE.instance_id)
temp_db_id = "temp_db" + unique_resource_id("_")
temp_db = instance.database(temp_db_id)
temp_db.spanner_api

# Endpoint is cached - resolved endpoint used.
self.assertIn(Config.INSTANCE.name, client._endpoint_cache)
self.assertEqual(
client._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
)

# Endpoint is cached at a class level.
self.assertIn(Config.INSTANCE.name, Config.CLIENT._endpoint_cache)
self.assertEqual(
Config.CLIENT._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
)

def test_list_databases(self):
# Since `Config.INSTANCE` is newly created in `setUpModule`, the
# database created in `setUpClass` here will be the only one.
Expand Down
39 changes: 38 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _constructor_test_helper(
user_agent=None,
client_options=None,
):
import google.api_core.client_options
from google.cloud.spanner_v1 import client as MUT

kwargs = {}
Expand All @@ -66,6 +67,14 @@ def _constructor_test_helper(
else:
expected_client_info = MUT._CLIENT_INFO

kwargs["client_options"] = client_options
if type(client_options) == dict:
expected_client_options = google.api_core.client_options.from_dict(
client_options
)
else:
expected_client_options = client_options

client = self._make_one(
project=self.PROJECT, credentials=creds, user_agent=user_agent, **kwargs
)
Expand All @@ -80,7 +89,14 @@ def _constructor_test_helper(
self.assertEqual(client.project, self.PROJECT)
self.assertIs(client._client_info, expected_client_info)
self.assertEqual(client.user_agent, user_agent)
self.assertEqual(client._client_options, client_options)
if expected_client_options is not None:
self.assertIsInstance(
client._client_options, google.api_core.client_options.ClientOptions
)
self.assertEqual(
client._client_options.api_endpoint,
expected_client_options.api_endpoint,
)

def test_constructor_default_scopes(self):
from google.cloud.spanner_v1 import client as MUT
Expand Down Expand Up @@ -127,6 +143,27 @@ def test_constructor_credentials_wo_create_scoped(self):
expected_scopes = None
self._constructor_test_helper(expected_scopes, creds)

def test_constructor_custom_client_options_obj(self):
from google.api_core.client_options import ClientOptions
from google.cloud.spanner_v1 import client as MUT

expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
creds = _make_credentials()
self._constructor_test_helper(
expected_scopes,
creds,
client_options=ClientOptions(api_endpoint="endpoint"),
)

def test_constructor_custom_client_options_dict(self):
from google.cloud.spanner_v1 import client as MUT

expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
creds = _make_credentials()
self._constructor_test_helper(
expected_scopes, creds, client_options={"api_endpoint": "endpoint"}
)

def test_instance_admin_api(self):
from google.cloud.spanner_v1.client import SPANNER_ADMIN_SCOPE

Expand Down
Loading

0 comments on commit e072d5d

Please sign in to comment.