Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add resource based routing implementation #10183

Merged
merged 3 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion spanner/google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import warnings

from google.api_core.gapic_v1 import client_info
import google.api_core.client_options

# pylint: disable=line-too-long
from google.cloud.spanner_admin_database_v1.gapic.database_admin_client import ( # noqa
Expand Down Expand Up @@ -122,6 +123,7 @@ class Client(ClientWithProject):

_instance_admin_api = None
_database_admin_api = None
_endpoint_cache = {}
user_agent = None
_SET_PROJECT = True # Used by from_service_account_json()

Expand All @@ -143,7 +145,12 @@ def __init__(
project=project, credentials=credentials, _http=None
)
self._client_info = client_info
self._client_options = client_options
if client_options and type(client_options) == dict:
self._client_options = google.api_core.client_options.from_dict(
client_options
)
else:
self._client_options = client_options

if user_agent is not None:
warnings.warn(_USER_AGENT_DEPRECATED, DeprecationWarning, stacklevel=2)
Expand Down
47 changes: 47 additions & 0 deletions spanner/google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import copy
import functools
import os
import re
import threading
import warnings

from google.api_core.client_options import ClientOptions
import google.auth.credentials
from google.protobuf.struct_pb2 import Struct
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import PermissionDenied
import six

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -54,6 +58,19 @@
)


_RESOURCE_ROUTING_PERMISSIONS_WARNING = (
"The client library attempted to connect to an endpoint closer to your Cloud Spanner data "
"but was unable to do so. The client library will fallback and route requests to the endpoint "
larkee marked this conversation as resolved.
Show resolved Hide resolved
"given in the client options which may result in increased latency. "
larkee marked this conversation as resolved.
Show resolved Hide resolved
"We recommend including the scope https://www.googleapis.com/auth/spanner.admin so that the "
"client library can get an instance-specific endpoint and efficiently route requests."
)


class ResourceRoutingPermissionsWarning(Warning):
pass


class Database(object):
"""Representation of a Cloud Spanner Database.

Expand Down Expand Up @@ -178,6 +195,36 @@ def spanner_api(self):
credentials = credentials.with_scopes((SPANNER_DATA_SCOPE,))
client_info = self._instance._client._client_info
client_options = self._instance._client._client_options
if (
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING")
== "true"
):
endpoint_cache = self._instance._client._endpoint_cache
if self._instance.name in endpoint_cache:
client_options = ClientOptions(
api_endpoint=endpoint_cache[self._instance.name]
)
else:
try:
api = self._instance._client.instance_admin_api
resp = api.get_instance(
self._instance.name,
field_mask={"paths": ["endpoint_uris"]},
metadata=_metadata_with_prefix(self.name),
)
endpoints = resp.endpoint_uris
if endpoints:
endpoint_cache[self._instance.name] = list(endpoints)[0]
client_options = ClientOptions(
api_endpoint=endpoint_cache[self._instance.name]
)
# If there are no endpoints, use default endpoint.
except PermissionDenied:
warnings.warn(
_RESOURCE_ROUTING_PERMISSIONS_WARNING,
ResourceRoutingPermissionsWarning,
stacklevel=2,
)
self._spanner_api = SpannerClient(
credentials=credentials,
client_info=client_info,
Expand Down
58 changes: 58 additions & 0 deletions spanner/tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@


CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None
USE_RESOURCE_ROUTING = (
os.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true"
)

if CREATE_INSTANCE:
INSTANCE_ID = "google-cloud" + unique_resource_id("-")
Expand Down Expand Up @@ -282,6 +285,61 @@ def tearDown(self):
for doomed in self.to_delete:
doomed.drop()

@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
def test_spanner_api_use_user_specified_endpoint(self):
# Clear cache.
Client._endpoint_cache = {}
api = Config.CLIENT.instance_admin_api
resp = api.get_instance(
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
)
if not resp or not resp.endpoint_uris:
return # no resolved endpoint.
resolved_endpoint = resp.endpoint_uris[0]

client = Client(client_options={"api_endpoint": resolved_endpoint})

instance = client.instance(Config.INSTANCE.instance_id)
temp_db_id = "temp_db" + unique_resource_id("_")
temp_db = instance.database(temp_db_id)
temp_db.spanner_api

# No endpoint cache - Default endpoint used.
self.assertEqual(client._endpoint_cache, {})

@unittest.skipUnless(USE_RESOURCE_ROUTING, "requires enabling resource routing")
def test_spanner_api_use_resolved_endpoint(self):
# Clear cache.
Client._endpoint_cache = {}
api = Config.CLIENT.instance_admin_api
resp = api.get_instance(
Config.INSTANCE.name, field_mask={"paths": ["endpoint_uris"]}
)
if not resp or not resp.endpoint_uris:
return # no resolved endpoint.
resolved_endpoint = resp.endpoint_uris[0]

client = Client(
client_options=Config.CLIENT._client_options
) # Use same endpoint as main client.

instance = client.instance(Config.INSTANCE.instance_id)
temp_db_id = "temp_db" + unique_resource_id("_")
temp_db = instance.database(temp_db_id)
temp_db.spanner_api

# Endpoint is cached - resolved endpoint used.
self.assertIn(Config.INSTANCE.name, client._endpoint_cache)
self.assertEqual(
client._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
)

# Endpoint is cached at a class level.
self.assertIn(Config.INSTANCE.name, Config.CLIENT._endpoint_cache)
self.assertEqual(
Config.CLIENT._endpoint_cache[Config.INSTANCE.name], resolved_endpoint
)

def test_list_databases(self):
# Since `Config.INSTANCE` is newly created in `setUpModule`, the
# database created in `setUpClass` here will be the only one.
Expand Down
39 changes: 38 additions & 1 deletion spanner/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _constructor_test_helper(
user_agent=None,
client_options=None,
):
import google.api_core.client_options
from google.cloud.spanner_v1 import client as MUT

kwargs = {}
Expand All @@ -66,6 +67,14 @@ def _constructor_test_helper(
else:
expected_client_info = MUT._CLIENT_INFO

kwargs["client_options"] = client_options
if type(client_options) == dict:
expected_client_options = google.api_core.client_options.from_dict(
client_options
)
else:
expected_client_options = client_options

client = self._make_one(
project=self.PROJECT, credentials=creds, user_agent=user_agent, **kwargs
)
Expand All @@ -80,7 +89,14 @@ def _constructor_test_helper(
self.assertEqual(client.project, self.PROJECT)
self.assertIs(client._client_info, expected_client_info)
self.assertEqual(client.user_agent, user_agent)
self.assertEqual(client._client_options, client_options)
if expected_client_options is not None:
self.assertIsInstance(
client._client_options, google.api_core.client_options.ClientOptions
)
self.assertEqual(
client._client_options.api_endpoint,
expected_client_options.api_endpoint,
)

def test_constructor_default_scopes(self):
from google.cloud.spanner_v1 import client as MUT
Expand Down Expand Up @@ -127,6 +143,27 @@ def test_constructor_credentials_wo_create_scoped(self):
expected_scopes = None
self._constructor_test_helper(expected_scopes, creds)

def test_constructor_custom_client_options_obj(self):
from google.api_core.client_options import ClientOptions
from google.cloud.spanner_v1 import client as MUT

expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
creds = _make_credentials()
self._constructor_test_helper(
expected_scopes,
creds,
client_options=ClientOptions(api_endpoint="endpoint"),
)

def test_constructor_custom_client_options_dict(self):
from google.cloud.spanner_v1 import client as MUT

expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,)
creds = _make_credentials()
self._constructor_test_helper(
expected_scopes, creds, client_options={"api_endpoint": "endpoint"}
)

def test_instance_admin_api(self):
from google.cloud.spanner_v1.client import SPANNER_ADMIN_SCOPE

Expand Down
Loading