Skip to content

Commit

Permalink
feat: named db support (#882)
Browse files Browse the repository at this point in the history
  • Loading branch information
rwhogg authored Jul 25, 2023
1 parent 8640476 commit f5713b0
Show file tree
Hide file tree
Showing 26 changed files with 738 additions and 149 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Running System Tests

- To run system tests for a given package, you can execute::

$ export SYSTEM_TESTS_DATABASE=system-tests-named-db
$ nox -e system

.. note::
Expand Down Expand Up @@ -188,6 +189,7 @@ Running System Tests

# Create the indexes
$ gcloud datastore indexes create tests/system/index.yaml
$ gcloud alpha datastore indexes create --database=$SYSTEM_TESTS_DATABASE tests/system/index.yaml


*************
Expand Down
101 changes: 85 additions & 16 deletions google/cloud/ndb/_datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging

from google.api_core import exceptions as core_exceptions
from google.api_core import gapic_v1
from google.cloud.datastore import helpers
from google.cloud.datastore_v1.types import datastore as datastore_pb2
from google.cloud.datastore_v1.types import entity as entity_pb2
Expand Down Expand Up @@ -56,7 +57,7 @@ def stub():
return context.client.stub


def make_call(rpc_name, request, retries=None, timeout=None):
def make_call(rpc_name, request, retries=None, timeout=None, metadata=()):
"""Make a call to the Datastore API.
Args:
Expand All @@ -68,6 +69,8 @@ def make_call(rpc_name, request, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Future: Future for the eventual response for the API call.
Expand All @@ -85,7 +88,7 @@ def make_call(rpc_name, request, retries=None, timeout=None):
def rpc_call():
context = context_module.get_toplevel_context()

call = method.future(request, timeout=timeout)
call = method.future(request, timeout=timeout, metadata=metadata)
rpc = _remote.RemoteCall(call, rpc_name)
utils.logging_debug(log, rpc)
utils.logging_debug(log, "timeout={}", timeout)
Expand Down Expand Up @@ -282,7 +285,7 @@ def lookup_callback(self, rpc):
future.set_result(entity)


def _datastore_lookup(keys, read_options, retries=None, timeout=None):
def _datastore_lookup(keys, read_options, retries=None, timeout=None, metadata=()):
"""Issue a Lookup call to Datastore using gRPC.
Args:
Expand All @@ -295,18 +298,24 @@ def _datastore_lookup(keys, read_options, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Future: Future object for eventual result of lookup.
"""
client = context_module.get_context().client
request = datastore_pb2.LookupRequest(
project_id=client.project,
database_id=client.database,
keys=[key for key in keys],
read_options=read_options,
)
metadata = _add_routing_info(metadata, request)

return make_call("lookup", request, retries=retries, timeout=timeout)
return make_call(
"lookup", request, retries=retries, timeout=timeout, metadata=metadata
)


def get_read_options(options, default_read_consistency=None):
Expand Down Expand Up @@ -843,7 +852,7 @@ def _complete(key_pb):
return False


def _datastore_commit(mutations, transaction, retries=None, timeout=None):
def _datastore_commit(mutations, transaction, retries=None, timeout=None, metadata=()):
"""Call Commit on Datastore.
Args:
Expand All @@ -857,6 +866,8 @@ def _datastore_commit(mutations, transaction, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Tasklet: A future for
Expand All @@ -870,12 +881,16 @@ def _datastore_commit(mutations, transaction, retries=None, timeout=None):
client = context_module.get_context().client
request = datastore_pb2.CommitRequest(
project_id=client.project,
database_id=client.database,
mode=mode,
mutations=mutations,
transaction=transaction,
)
metadata = _add_routing_info(metadata, request)

return make_call("commit", request, retries=retries, timeout=timeout)
return make_call(
"commit", request, retries=retries, timeout=timeout, metadata=metadata
)


def allocate(keys, options):
Expand Down Expand Up @@ -973,7 +988,7 @@ def allocate_ids_callback(self, rpc):
future.set_result(key)


def _datastore_allocate_ids(keys, retries=None, timeout=None):
def _datastore_allocate_ids(keys, retries=None, timeout=None, metadata=()):
"""Calls ``AllocateIds`` on Datastore.
Args:
Expand All @@ -984,15 +999,22 @@ def _datastore_allocate_ids(keys, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Future: A future for
:class:`google.cloud.datastore_v1.datastore_pb2.AllocateIdsResponse`
"""
client = context_module.get_context().client
request = datastore_pb2.AllocateIdsRequest(project_id=client.project, keys=keys)
request = datastore_pb2.AllocateIdsRequest(
project_id=client.project, database_id=client.database, keys=keys
)
metadata = _add_routing_info(metadata, request)

return make_call("allocate_ids", request, retries=retries, timeout=timeout)
return make_call(
"allocate_ids", request, retries=retries, timeout=timeout, metadata=metadata
)


@tasklets.tasklet
Expand All @@ -1018,7 +1040,7 @@ def begin_transaction(read_only, retries=None, timeout=None):
raise tasklets.Return(response.transaction)


def _datastore_begin_transaction(read_only, retries=None, timeout=None):
def _datastore_begin_transaction(read_only, retries=None, timeout=None, metadata=()):
"""Calls ``BeginTransaction`` on Datastore.
Args:
Expand All @@ -1029,6 +1051,8 @@ def _datastore_begin_transaction(read_only, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Tasklet: A future for
Expand All @@ -1045,10 +1069,19 @@ def _datastore_begin_transaction(read_only, retries=None, timeout=None):
)

request = datastore_pb2.BeginTransactionRequest(
project_id=client.project, transaction_options=options
project_id=client.project,
database_id=client.database,
transaction_options=options,
)
metadata = _add_routing_info(metadata, request)

return make_call(
"begin_transaction",
request,
retries=retries,
timeout=timeout,
metadata=metadata,
)

return make_call("begin_transaction", request, retries=retries, timeout=timeout)


@tasklets.tasklet
Expand All @@ -1069,7 +1102,7 @@ def rollback(transaction, retries=None, timeout=None):
yield _datastore_rollback(transaction, retries=retries, timeout=timeout)


def _datastore_rollback(transaction, retries=None, timeout=None):
def _datastore_rollback(transaction, retries=None, timeout=None, metadata=()):
"""Calls Rollback in Datastore.
Args:
Expand All @@ -1079,14 +1112,50 @@ def _datastore_rollback(transaction, retries=None, timeout=None):
If :data:`0` is passed, the call is attempted only once.
timeout (float): Timeout, in seconds, to pass to gRPC call. If
:data:`None` is passed, will use :data:`_DEFAULT_TIMEOUT`.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
tasklets.Tasklet: Future for
:class:`google.cloud.datastore_v1.datastore_pb2.RollbackResponse`
"""
client = context_module.get_context().client
request = datastore_pb2.RollbackRequest(
project_id=client.project, transaction=transaction
project_id=client.project,
database_id=client.database,
transaction=transaction,
)
metadata = _add_routing_info(metadata, request)

return make_call(
"rollback", request, retries=retries, timeout=timeout, metadata=metadata
)


def _add_routing_info(metadata, request):
"""Adds routing header info to the given metadata.
Args:
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata. Not modified.
request (Any): An appropriate request object for the call, eg,
`entity_pb2.LookupRequest` for calling ``Lookup``.
Returns:
Sequence[Tuple[str, str]]: Sequence with routing info added,
if it is included in the request.
"""
header_params = {}

if request.project_id:
header_params["project_id"] = request.project_id

if request.database_id:
header_params["database_id"] = request.database_id

if header_params:
return tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(header_params),
)

return make_call("rollback", request, retries=retries, timeout=timeout)
return tuple(metadata)
9 changes: 7 additions & 2 deletions google/cloud/ndb/_datastore_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,17 +1010,22 @@ def _datastore_run_query(query):
"""
query_pb = _query_to_protobuf(query)
partition_id = entity_pb2.PartitionId(
project_id=query.project, namespace_id=query.namespace
project_id=query.project,
database_id=query.database,
namespace_id=query.namespace,
)
read_options = _datastore_api.get_read_options(query)
request = datastore_pb2.RunQueryRequest(
project_id=query.project,
database_id=query.database,
partition_id=partition_id,
query=query_pb,
read_options=read_options,
)
metadata = _datastore_api._add_routing_info((), request)

response = yield _datastore_api.make_call(
"run_query", request, timeout=query.timeout
"run_query", request, timeout=query.timeout, metadata=metadata
)
utils.logging_debug(log, response)
raise tasklets.Return(response)
Expand Down
8 changes: 5 additions & 3 deletions google/cloud/ndb/_gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ def __init__(self, query_string, _app=None, _auth_domain=None, namespace=None):
Args:
query_string (str): properly formatted GQL query string.
namespace (str): the namespace to use for this query.
namespace (str): The namespace to use for this query. Defaults to the client's value.
Raises:
exceptions.BadQueryError: if the query is not parsable.
"""
Expand Down Expand Up @@ -853,7 +852,10 @@ def _key_function(values):
context = context_module.get_context()
client = context.client
return key.Key(
*values, namespace=context.get_namespace(), project=client.project
*values,
project=client.project,
database=client.database,
namespace=context.get_namespace(),
)
_raise_cast_error(
"Key requires even number of operands or single string, {}".format(values)
Expand Down
12 changes: 10 additions & 2 deletions google/cloud/ndb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A client for NDB which manages credentials, project, namespace."""
"""A client for NDB which manages credentials, project, namespace, and database."""

import contextlib
import grpc
Expand Down Expand Up @@ -92,17 +92,25 @@ class Client(google_client.ClientWithProject):
client_options (Optional[:class:`~google.api_core.client_options.ClientOptions` or :class:`dict`])
Client options used to set user options on the client.
API Endpoint should be set through client_options.
database (Optional[str]): Database to access. Defaults to the (default) database.
"""

SCOPE = ("https://www.googleapis.com/auth/datastore",)
"""The scopes required for authenticating as a Cloud Datastore consumer."""

def __init__(
self, project=None, namespace=None, credentials=None, client_options=None
self,
project=None,
namespace=None,
credentials=None,
client_options=None,
database=None,
):
self.namespace = namespace
self.host = os.environ.get(environment_vars.GCD_HOST, DATASTORE_API_HOST)
self.client_info = _CLIENT_INFO
self._client_options = client_options
self.database = database

# Use insecure connection when using Datastore Emulator, otherwise
# use secure connection
Expand Down
Loading

0 comments on commit f5713b0

Please sign in to comment.