Skip to content

Commit

Permalink
Making datastore Connection.commit() return low-level protobuf.
Browse files Browse the repository at this point in the history
Towards #2746. This approach is to slowly transition from our
current approach to use the GAPIC generated surface.

It is unfortunately tangled quite a bit (partly because we
may have too much mocked in the tests).
  • Loading branch information
dhermes committed Feb 24, 2017
1 parent 5557e19 commit 9488e3c
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 138 deletions.
27 changes: 3 additions & 24 deletions datastore/google/cloud/datastore/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,16 @@ def commit(self, project, request, transaction_id):
This method will mutate ``request`` before using it.
:rtype: tuple
:returns: The pair of the number of index updates and a list of
:class:`.entity_pb2.Key` for each incomplete key
that was completed in the commit.
:rtype: :class:`.datastore_pb2.CommitResponse`
:returns: The protobuf response from a commit request.
"""
if transaction_id:
request.mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
request.transaction = transaction_id
else:
request.mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL

response = self._datastore_api.commit(project, request)
return _parse_commit_response(response)
return self._datastore_api.commit(project, request)

def rollback(self, project, transaction_id):
"""Rollback the connection's existing transaction.
Expand Down Expand Up @@ -508,21 +505,3 @@ def _add_keys_to_request(request_field_pb, key_pbs):
"""
for key_pb in key_pbs:
request_field_pb.add().CopyFrom(key_pb)


def _parse_commit_response(commit_response_pb):
"""Extract response data from a commit response.
:type commit_response_pb: :class:`.datastore_pb2.CommitResponse`
:param commit_response_pb: The protobuf response from a commit request.
:rtype: tuple
:returns: The pair of the number of index updates and a list of
:class:`.entity_pb2.Key` for each incomplete key
that was completed in the commit.
"""
mut_results = commit_response_pb.mutation_results
index_updates = commit_response_pb.index_updates
completed_keys = [mut_result.key for mut_result in mut_results
if mut_result.HasField('key')] # Message field (Key)
return index_updates, completed_keys
21 changes: 20 additions & 1 deletion datastore/google/cloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ def _commit(self):
This is called by :meth:`commit`.
"""
# NOTE: ``self._commit_request`` will be modified.
_, updated_keys = self._client._connection.commit(
commit_response_pb = self._client._connection.commit(
self.project, self._commit_request, self._id)
_, updated_keys = _parse_commit_response(commit_response_pb)
# If the back-end returns without error, we are guaranteed that
# :meth:`Connection.commit` will return keys that match (length and
# order) directly ``_partial_key_entities``.
Expand Down Expand Up @@ -311,3 +312,21 @@ def _assign_entity_to_pb(entity_pb, entity):
bare_entity_pb = helpers.entity_to_protobuf(entity)
bare_entity_pb.key.CopyFrom(bare_entity_pb.key)
entity_pb.CopyFrom(bare_entity_pb)


def _parse_commit_response(commit_response_pb):
"""Extract response data from a commit response.
:type commit_response_pb: :class:`.datastore_pb2.CommitResponse`
:param commit_response_pb: The protobuf response from a commit request.
:rtype: tuple
:returns: The pair of the number of index updates and a list of
:class:`.entity_pb2.Key` for each incomplete key
that was completed in the commit.
"""
mut_results = commit_response_pb.mutation_results
index_updates = commit_response_pb.index_updates
completed_keys = [mut_result.key for mut_result in mut_results
if mut_result.HasField('key')] # Message field (Key)
return index_updates, completed_keys
96 changes: 16 additions & 80 deletions datastore/unit_tests/test__http.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,8 @@ def test_commit_wo_transaction(self):
from google.cloud.grpc.datastore.v1 import datastore_pb2
from google.cloud.datastore.helpers import _new_value_pb

PROJECT = 'PROJECT'
key_pb = self._make_key_pb(PROJECT)
project = 'PROJECT'
key_pb = self._make_key_pb(project)
rsp_pb = datastore_pb2.CommitResponse()
req_pb = datastore_pb2.CommitRequest()
mutation = req_pb.mutations.add()
Expand All @@ -708,44 +708,32 @@ def test_commit_wo_transaction(self):
http = Http({'status': '200'}, rsp_pb.SerializeToString())
client = mock.Mock(_http=http, spec=['_http'])
conn = self._make_one(client)
URI = '/'.join([
uri = '/'.join([
conn.api_base_url,
conn.API_VERSION,
'projects',
PROJECT + ':commit',
project + ':commit',
])

# Set up mock for parsing the response.
expected_result = object()
_parsed = []

def mock_parse(response):
_parsed.append(response)
return expected_result

patch = mock.patch(
'google.cloud.datastore._http._parse_commit_response',
new=mock_parse)
with patch:
result = conn.commit(PROJECT, req_pb, None)
result = conn.commit(project, req_pb, None)
self.assertEqual(result, rsp_pb)

self.assertIs(result, expected_result)
# Verify the caller.
cw = http._called_with
self._verify_protobuf_call(cw, URI, conn)
self._verify_protobuf_call(cw, uri, conn)
rq_class = datastore_pb2.CommitRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.transaction, b'')
self.assertEqual(list(request.mutations), [mutation])
self.assertEqual(request.mode, rq_class.NON_TRANSACTIONAL)
self.assertEqual(_parsed, [rsp_pb])

def test_commit_w_transaction(self):
from google.cloud.grpc.datastore.v1 import datastore_pb2
from google.cloud.datastore.helpers import _new_value_pb

PROJECT = 'PROJECT'
key_pb = self._make_key_pb(PROJECT)
project = 'PROJECT'
key_pb = self._make_key_pb(project)
rsp_pb = datastore_pb2.CommitResponse()
req_pb = datastore_pb2.CommitRequest()
mutation = req_pb.mutations.add()
Expand All @@ -756,37 +744,25 @@ def test_commit_w_transaction(self):
http = Http({'status': '200'}, rsp_pb.SerializeToString())
client = mock.Mock(_http=http, spec=['_http'])
conn = self._make_one(client)
URI = '/'.join([
uri = '/'.join([
conn.api_base_url,
conn.API_VERSION,
'projects',
PROJECT + ':commit',
project + ':commit',
])

# Set up mock for parsing the response.
expected_result = object()
_parsed = []
result = conn.commit(project, req_pb, b'xact')
self.assertEqual(result, rsp_pb)

def mock_parse(response):
_parsed.append(response)
return expected_result

patch = mock.patch(
'google.cloud.datastore._http._parse_commit_response',
new=mock_parse)
with patch:
result = conn.commit(PROJECT, req_pb, b'xact')

self.assertIs(result, expected_result)
# Verify the caller.
cw = http._called_with
self._verify_protobuf_call(cw, URI, conn)
self._verify_protobuf_call(cw, uri, conn)
rq_class = datastore_pb2.CommitRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.transaction, b'xact')
self.assertEqual(list(request.mutations), [mutation])
self.assertEqual(request.mode, rq_class.TRANSACTIONAL)
self.assertEqual(_parsed, [rsp_pb])

def test_rollback_ok(self):
from google.cloud.grpc.datastore.v1 import datastore_pb2
Expand Down Expand Up @@ -870,46 +846,6 @@ def test_allocate_ids_non_empty(self):
self.assertEqual(key_before, key_after)


class Test__parse_commit_response(unittest.TestCase):

def _call_fut(self, commit_response_pb):
from google.cloud.datastore._http import _parse_commit_response

return _parse_commit_response(commit_response_pb)

def test_it(self):
from google.cloud.grpc.datastore.v1 import datastore_pb2
from google.cloud.grpc.datastore.v1 import entity_pb2

index_updates = 1337
keys = [
entity_pb2.Key(
path=[
entity_pb2.Key.PathElement(
kind='Foo',
id=1234,
),
],
),
entity_pb2.Key(
path=[
entity_pb2.Key.PathElement(
kind='Bar',
name='baz',
),
],
),
]
response = datastore_pb2.CommitResponse(
mutation_results=[
datastore_pb2.MutationResult(key=key) for key in keys
],
index_updates=index_updates,
)
result = self._call_fut(response)
self.assertEqual(result, (index_updates, keys))


class Http(object):

_called_with = None
Expand Down
78 changes: 61 additions & 17 deletions datastore/unit_tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ def test_commit_wrong_status(self):
self.assertRaises(ValueError, batch.commit)

def test_commit_w_partial_key_entities(self):
_PROJECT = 'PROJECT'
_NEW_ID = 1234
connection = _Connection(_NEW_ID)
client = _Client(_PROJECT, connection)
project = 'PROJECT'
new_id = 1234
connection = _Connection(new_id)
client = _Client(project, connection)
batch = self._make_one(client)
entity = _Entity({})
key = entity.key = _Key(_PROJECT)
key = entity.key = _Key(project)
key._id = None
batch._partial_key_entities.append(entity)

Expand All @@ -266,9 +266,9 @@ def test_commit_w_partial_key_entities(self):
self.assertEqual(batch._status, batch._FINISHED)

self.assertEqual(connection._committed,
[(_PROJECT, batch._commit_request, None)])
[(project, batch._commit_request, None)])
self.assertFalse(entity.key.is_partial)
self.assertEqual(entity.key._id, _NEW_ID)
self.assertEqual(entity.key._id, new_id)

def test_as_context_mgr_wo_error(self):
_PROJECT = 'PROJECT'
Expand Down Expand Up @@ -369,30 +369,62 @@ def begin(self):
self.assertEqual(client._batches, [])


class _PathElementPB(object):
class Test__parse_commit_response(unittest.TestCase):

def __init__(self, id_):
self.id = id_
def _call_fut(self, commit_response_pb):
from google.cloud.datastore.batch import _parse_commit_response

return _parse_commit_response(commit_response_pb)

class _KeyPB(object):
def test_it(self):
from google.cloud.grpc.datastore.v1 import datastore_pb2
from google.cloud.grpc.datastore.v1 import entity_pb2

def __init__(self, id_):
self.path = [_PathElementPB(id_)]
index_updates = 1337
keys = [
entity_pb2.Key(
path=[
entity_pb2.Key.PathElement(
kind='Foo',
id=1234,
),
],
),
entity_pb2.Key(
path=[
entity_pb2.Key.PathElement(
kind='Bar',
name='baz',
),
],
),
]
response = datastore_pb2.CommitResponse(
mutation_results=[
datastore_pb2.MutationResult(key=key) for key in keys
],
index_updates=index_updates,
)
result = self._call_fut(response)
self.assertEqual(result, (index_updates, keys))


class _Connection(object):
_marker = object()
_save_result = (False, None)

def __init__(self, *new_keys):
self._completed_keys = [_KeyPB(key) for key in new_keys]
def __init__(self, *new_key_ids):
from google.cloud.grpc.datastore.v1 import datastore_pb2

self._committed = []
self._index_updates = 0
mutation_results = [
_make_mutation(key_id) for key_id in new_key_ids]
self._commit_response_pb = datastore_pb2.CommitResponse(
mutation_results=mutation_results)

def commit(self, project, commit_request, transaction_id):
self._committed.append((project, commit_request, transaction_id))
return self._index_updates, self._completed_keys
return self._commit_response_pb


class _Entity(dict):
Expand Down Expand Up @@ -472,3 +504,15 @@ def _mutated_pb(test_case, mutation_pb_list, mutation_type):
mutation_type)

return getattr(mutated_pb, mutation_type)


def _make_mutation(id_):
from google.cloud.grpc.datastore.v1 import datastore_pb2
from google.cloud.grpc.datastore.v1 import entity_pb2

key = entity_pb2.Key()
key.partition_id.project_id = 'PROJECT'
elem = key.path.add()
elem.kind = 'Kind'
elem.id = id_
return datastore_pb2.MutationResult(key=key)
21 changes: 17 additions & 4 deletions datastore/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ def test_put_multi_no_batch_w_partial_key(self):

creds = _make_credentials()
client = self._make_one(credentials=creds)
client._connection._commit.append([_KeyPB(key)])
key_pb = _make_key(234)
client._connection._commit.append([key_pb])

result = client.put_multi([entity])
self.assertIsNone(result)
Expand Down Expand Up @@ -931,7 +932,6 @@ def __init__(self, credentials=None, http=None):
self._commit = []
self._alloc_cw = []
self._alloc = []
self._index_updates = 0

def _add_lookup_result(self, results=(), missing=(), deferred=()):
self._lookup.append((list(results), list(missing), list(deferred)))
Expand All @@ -943,9 +943,13 @@ def lookup(self, project, key_pbs, eventual=False, transaction_id=None):
return results, missing, deferred

def commit(self, project, commit_request, transaction_id):
from google.cloud.grpc.datastore.v1 import datastore_pb2

self._commit_cw.append((project, commit_request, transaction_id))
response, self._commit = self._commit[0], self._commit[1:]
return self._index_updates, response
keys, self._commit = self._commit[0], self._commit[1:]
mutation_results = [
datastore_pb2.MutationResult(key=key) for key in keys]
return datastore_pb2.CommitResponse(mutation_results=mutation_results)

def allocate_ids(self, project, key_pbs):
self._alloc_cw.append((project, key_pbs))
Expand Down Expand Up @@ -1058,3 +1062,12 @@ def _mutated_pb(test_case, mutation_pb_list, mutation_type):
mutation_type)

return getattr(mutated_pb, mutation_type)


def _make_key(id_):
from google.cloud.grpc.datastore.v1 import entity_pb2

key = entity_pb2.Key()
elem = key.path.add()
elem.id = id_
return key
Loading

0 comments on commit 9488e3c

Please sign in to comment.