Skip to content

Commit

Permalink
Add explicit 'transaction_id' parm to 'Connection.{lookup,run_query}'.
Browse files Browse the repository at this point in the history
  • Loading branch information
tseaver committed Jan 15, 2015
1 parent 301498d commit a4a65a2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 40 deletions.
4 changes: 4 additions & 0 deletions gcloud/datastore/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from gcloud.datastore import _implicit_environ
from gcloud.datastore.batch import Batch
from gcloud.datastore.transaction import Transaction
from gcloud.datastore import helpers


Expand Down Expand Up @@ -112,10 +113,13 @@ def get(keys, missing=None, deferred=None, connection=None):
connection = _require_connection(connection)
dataset_id = _get_dataset_id_from_keys(keys)

transaction = Transaction.current()

entity_pbs = connection.lookup(
dataset_id=dataset_id,
key_pbs=[k.to_protobuf() for k in keys],
missing=missing, deferred=deferred,
transaction_id=transaction and transaction.id,
)

if missing is not None:
Expand Down
32 changes: 20 additions & 12 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from gcloud import connection
from gcloud.datastore import datastore_v1_pb2 as datastore_pb
from gcloud.datastore import helpers
from gcloud.datastore.batch import Batch
from gcloud.datastore.transaction import Transaction


class Connection(connection.Connection):
Expand Down Expand Up @@ -127,7 +125,8 @@ def build_api_url(cls, dataset_id, method, base_url=None,
dataset_id=dataset_id, method=method)

def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None, eventual=False):
missing=None, deferred=None,
eventual=False, transaction_id=None):
"""Lookup keys from a dataset in the Cloud Datastore.
Maps the ``DatastoreService.Lookup`` protobuf RPC.
Expand Down Expand Up @@ -170,6 +169,11 @@ def lookup(self, dataset_id, key_pbs,
consistency. If True, request ``EVENTUAL`` read
consistency.
:type transaction_id: string
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
Expand All @@ -186,7 +190,7 @@ def lookup(self, dataset_id, key_pbs,
raise ValueError('deferred must be None or an empty list')

lookup_request = datastore_pb.LookupRequest()
_set_read_options(lookup_request, eventual)
_set_read_options(lookup_request, eventual, transaction_id)

single_key = isinstance(key_pbs, datastore_pb.Key)

Expand All @@ -212,7 +216,8 @@ def lookup(self, dataset_id, key_pbs,

return results

def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
def run_query(self, dataset_id, query_pb, namespace=None,
eventual=False, transaction_id=None):
"""Run a query on the Cloud Datastore.
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Expand Down Expand Up @@ -262,9 +267,14 @@ def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency.
:type transaction_id: string
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
"""
request = datastore_pb.RunQueryRequest()
_set_read_options(request, eventual)
_set_read_options(request, eventual, transaction_id)

if namespace:
request.partition_id.namespace = namespace
Expand Down Expand Up @@ -413,21 +423,19 @@ def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
return results, missing, deferred


def _set_read_options(request, eventual):
def _set_read_options(request, eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
"""
current = Batch.current()
transaction = isinstance(current, Transaction) and current or None
if eventual and transaction:
if eventual and (transaction_id is not None):
raise ValueError('eventual must be False when in a transaction')

opts = request.read_options
if eventual:
opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL
elif transaction:
opts.transaction = transaction.id
elif transaction_id:
opts.transaction = transaction_id


def _copy_deferred_keys(lookup_request, lookup_response):
Expand Down
4 changes: 4 additions & 0 deletions gcloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from gcloud.datastore import datastore_v1_pb2 as datastore_pb
from gcloud.datastore import helpers
from gcloud.datastore.key import Key
from gcloud.datastore.transaction import Transaction


class Query(object):
Expand Down Expand Up @@ -377,10 +378,13 @@ def next_page(self):

pb.offset = self._offset

transaction = Transaction.current()

query_results = self._connection.run_query(
query_pb=pb,
dataset_id=self._query.dataset_id,
namespace=self._query.namespace,
transaction_id=transaction and transaction.id,
)
# NOTE: `query_results` contains an extra value that we don't use,
# namely `skipped_results`.
Expand Down
56 changes: 55 additions & 1 deletion gcloud/datastore/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def test_hit_multiple_keys_different_dataset(self):
with self.assertRaises(ValueError):
self._callFUT([key1, key2], connection=object())

def test_implicit(self):
def test_implicit_wo_transaction(self):
from gcloud.datastore import _implicit_environ
from gcloud.datastore.key import Key
from gcloud.datastore.test_connection import _Connection
Expand All @@ -297,6 +297,43 @@ def test_implicit(self):
expected_called_with = {
'dataset_id': DATASET_ID,
'key_pbs': [key.to_protobuf()],
'transaction_id': None,
}
self.assertEqual(CUSTOM_CONNECTION._called_with, expected_called_with)

new_key = result.key
# Check the returned value is as expected.
self.assertFalse(new_key is key)
self.assertEqual(new_key.dataset_id, DATASET_ID)
self.assertEqual(new_key.path, PATH)
self.assertEqual(list(result), ['foo'])
self.assertEqual(result['foo'], 'Foo')

def test_w_transaction(self):
from gcloud.datastore.key import Key
from gcloud.datastore.test_connection import _Connection

DATASET_ID = 'DATASET'
KIND = 'Kind'
ID = 1234
PATH = [{'kind': KIND, 'id': ID}]
TRANSACTION = 'TRANSACTION'

# Make a found entity pb to be returned from mock backend.
entity_pb = self._make_entity_pb(DATASET_ID, KIND, ID,
'foo', 'Foo')

# Make a connection to return the entity pb.
CUSTOM_CONNECTION = _Connection(entity_pb)

key = Key(KIND, ID, dataset_id=DATASET_ID)
with _NoCommitTransaction(DATASET_ID, CUSTOM_CONNECTION, TRANSACTION):
result, = self._callFUT([key], connection=CUSTOM_CONNECTION)

expected_called_with = {
'dataset_id': DATASET_ID,
'key_pbs': [key.to_protobuf()],
'transaction_id': TRANSACTION,
}
self.assertEqual(CUSTOM_CONNECTION._called_with, expected_called_with)

Expand Down Expand Up @@ -569,3 +606,20 @@ def __enter__(self):
def __exit__(self, *args):
from gcloud.datastore.batch import _BATCHES
_BATCHES.pop()


class _NoCommitTransaction(object):

def __init__(self, dataset_id, connection, transaction_id):
from gcloud.datastore.transaction import Transaction
xact = self._transaction = Transaction(dataset_id, connection)
xact._id = transaction_id

def __enter__(self):
from gcloud.datastore.batch import _BATCHES
_BATCHES.push(self._transaction)
return self._transaction

def __exit__(self, *args):
from gcloud.datastore.batch import _BATCHES
_BATCHES.pop()
36 changes: 9 additions & 27 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
TRANSACTION = 'TRANSACTION'
key_pb = self._make_key_pb(DATASET_ID)
conn = self._makeOne()
with _NoCommitTransaction(DATASET_ID, conn, TRANSACTION):
self.assertRaises(
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)
self.assertRaises(ValueError,
conn.lookup, DATASET_ID, key_pb,
eventual=True, transaction_id=TRANSACTION)

def test_lookup_single_key_empty_response_w_transaction(self):
from gcloud.datastore import datastore_v1_pb2 as datastore_pb
Expand All @@ -260,8 +260,7 @@ def test_lookup_single_key_empty_response_w_transaction(self):
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
with _NoCommitTransaction(DATASET_ID, conn, TRANSACTION):
found = conn.lookup(DATASET_ID, key_pb)
found = conn.lookup(DATASET_ID, key_pb, transaction_id=TRANSACTION)
self.assertEqual(found, None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
Expand Down Expand Up @@ -540,8 +539,8 @@ def test_run_query_wo_eventual_w_transaction(self):
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
with _NoCommitTransaction(DATASET_ID, conn, TRANSACTION):
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
pbs, end, more, skipped = conn.run_query(
DATASET_ID, q_pb, transaction_id=TRANSACTION)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
Expand Down Expand Up @@ -571,9 +570,9 @@ def test_run_query_w_eventual_and_transaction(self):
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
with _NoCommitTransaction(DATASET_ID, conn, TRANSACTION):
self.assertRaises(
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)
self.assertRaises(ValueError,
conn.run_query, DATASET_ID, q_pb,
eventual=True, transaction_id=TRANSACTION)

def test_run_query_wo_namespace_empty_result(self):
from gcloud.datastore import datastore_v1_pb2 as datastore_pb
Expand Down Expand Up @@ -912,20 +911,3 @@ class _KeyProto(object):

def __init__(self, id_):
self.path_element = [_PathElementProto(id_)]


class _NoCommitTransaction(object):

def __init__(self, dataset_id, connection, transaction_id):
from gcloud.datastore.transaction import Transaction
xact = self._transaction = Transaction(dataset_id, connection)
xact._id = transaction_id

def __enter__(self):
from gcloud.datastore.batch import _BATCHES
_BATCHES.push(self._transaction)
return self._transaction

def __exit__(self, *args):
from gcloud.datastore.batch import _BATCHES
_BATCHES.pop()
6 changes: 6 additions & 0 deletions gcloud/datastore/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ def test_next_page_no_cursors_no_more(self):
'dataset_id': self._DATASET,
'query_pb': qpb,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
self.assertEqual(connection._called_with, [EXPECTED])

Expand All @@ -431,6 +432,7 @@ def test_next_page_no_cursors_no_more_w_offset_and_limit(self):
'dataset_id': self._DATASET,
'query_pb': qpb,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
self.assertEqual(connection._called_with, [EXPECTED])

Expand Down Expand Up @@ -463,6 +465,7 @@ def test_next_page_w_cursors_w_more(self):
'dataset_id': self._DATASET,
'query_pb': qpb,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
self.assertEqual(connection._called_with, [EXPECTED])

Expand Down Expand Up @@ -494,6 +497,7 @@ def test___iter___no_more(self):
'dataset_id': self._DATASET,
'query_pb': qpb,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
self.assertEqual(connection._called_with, [EXPECTED])

Expand Down Expand Up @@ -522,11 +526,13 @@ def test___iter___w_more(self):
'dataset_id': self._DATASET,
'query_pb': qpb1,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
EXPECTED2 = {
'dataset_id': self._DATASET,
'query_pb': qpb2,
'namespace': self._NAMESPACE,
'transaction_id': None,
}
self.assertEqual(len(connection._called_with), 2)
self.assertEqual(connection._called_with[0], EXPECTED1)
Expand Down

0 comments on commit a4a65a2

Please sign in to comment.