Skip to content

Commit 5723288

Browse files
committed
Merge pull request #449 from tseaver/305-lookup-run_query-eventual
Add 'eventual' flag to 'Connection.lookup()' / 'Connection.run_query()'.
2 parents 6b71b1a + a2a11bd commit 5723288

File tree

2 files changed

+240
-22
lines changed

2 files changed

+240
-22
lines changed

gcloud/datastore/connection.py

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
167167
kwargs['connection'] = self
168168
return Dataset(*args, **kwargs)
169169

170-
def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
170+
def lookup(self, dataset_id, key_pbs,
171+
missing=None, deferred=None, eventual=False):
171172
"""Lookup keys from a dataset in the Cloud Datastore.
172173
173174
Maps the ``DatastoreService.Lookup`` protobuf RPC.
@@ -211,13 +212,20 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
211212
by the backend as "deferred" will be copied into it.
212213
Use only as a keyword param.
213214
215+
:type eventual: bool
216+
:param eventual: If False (the default), request ``STRONG`` read
217+
consistency. If True, request ``EVENTUAL`` read
218+
consistency. If the connection has a current
219+
transaction, this value *must* be false.
220+
214221
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
215222
(or a single Entity)
216223
:returns: The entities corresponding to the keys provided.
217224
If a single key was provided and no results matched,
218225
this will return None.
219226
If multiple keys were provided and no results matched,
220227
this will return an empty list.
228+
:raises: ValueError if ``eventual`` is True
221229
"""
222230
if missing is not None and missing != []:
223231
raise ValueError('missing must be None or an empty list')
@@ -226,6 +234,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
226234
raise ValueError('deferred must be None or an empty list')
227235

228236
lookup_request = datastore_pb.LookupRequest()
237+
self._set_read_options(lookup_request, eventual)
229238

230239
single_key = isinstance(key_pbs, datastore_pb.Key)
231240

@@ -235,28 +244,14 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
235244
for key_pb in key_pbs:
236245
lookup_request.key.add().CopyFrom(key_pb)
237246

238-
results = []
239-
while True: # loop against possible deferred.
240-
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
241-
datastore_pb.LookupResponse)
247+
results, missing_found, deferred_found = self._lookup(
248+
lookup_request, dataset_id, deferred is not None)
242249

243-
results.extend(
244-
[result.entity for result in lookup_response.found])
250+
if missing is not None:
251+
missing.extend(missing_found)
245252

246-
if missing is not None:
247-
missing.extend(
248-
[result.entity for result in lookup_response.missing])
249-
250-
if deferred is not None:
251-
deferred.extend([key for key in lookup_response.deferred])
252-
break
253-
254-
if not lookup_response.deferred:
255-
break
256-
257-
# We have deferred keys, and the user didn't ask to know about
258-
# them, so retry (but only with the deferred ones).
259-
_copy_deferred_keys(lookup_request, lookup_response)
253+
if deferred is not None:
254+
deferred.extend(deferred_found)
260255

261256
if single_key:
262257
if results:
@@ -266,7 +261,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
266261

267262
return results
268263

269-
def run_query(self, dataset_id, query_pb, namespace=None):
264+
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
270265
"""Run a query on the Cloud Datastore.
271266
272267
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
@@ -310,8 +305,15 @@ def run_query(self, dataset_id, query_pb, namespace=None):
310305
311306
:type namespace: string
312307
:param namespace: The namespace over which to run the query.
308+
309+
:type eventual: bool
310+
:param eventual: If False (the default), request ``STRONG`` read
311+
consistency. If True, request ``EVENTUAL`` read
312+
consistency. If the connection has a current
313+
transaction, this value *must* be false.
313314
"""
314315
request = datastore_pb.RunQueryRequest()
316+
self._set_read_options(request, eventual)
315317

316318
if namespace:
317319
request.partition_id.namespace = namespace
@@ -514,6 +516,51 @@ def delete_entities(self, dataset_id, key_pbs):
514516

515517
return True
516518

519+
def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
520+
"""Repeat lookup until all keys found (unless stop requested).
521+
522+
Helper method for ``lookup()``.
523+
"""
524+
results = []
525+
missing = []
526+
deferred = []
527+
while True: # loop against possible deferred.
528+
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
529+
datastore_pb.LookupResponse)
530+
531+
results.extend(
532+
[result.entity for result in lookup_response.found])
533+
534+
missing.extend(
535+
[result.entity for result in lookup_response.missing])
536+
537+
if stop_on_deferred:
538+
deferred.extend([key for key in lookup_response.deferred])
539+
break
540+
541+
if not lookup_response.deferred:
542+
break
543+
544+
# We have deferred keys, and the user didn't ask to know about
545+
# them, so retry (but only with the deferred ones).
546+
_copy_deferred_keys(lookup_request, lookup_response)
547+
return results, missing, deferred
548+
549+
def _set_read_options(self, request, eventual):
550+
"""Validate rules for read options, and assign to the request.
551+
552+
Helper method for ``lookup()`` and ``run_query``.
553+
"""
554+
transaction = self.transaction()
555+
if eventual and transaction:
556+
raise ValueError('eventual must be False when in a transaction')
557+
558+
opts = request.read_options
559+
if eventual:
560+
opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL
561+
elif transaction:
562+
opts.transaction = transaction
563+
517564

518565
def _copy_deferred_keys(lookup_request, lookup_response):
519566
"""Clear requested keys and copy deferred keys back in.

gcloud/datastore/test_connection.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,77 @@ def test_lookup_single_key_empty_response(self):
231231
self.assertEqual(len(keys), 1)
232232
self.assertEqual(keys[0], key_pb)
233233

234+
def test_lookup_single_key_empty_response_w_eventual(self):
235+
from gcloud.datastore.connection import datastore_pb
236+
from gcloud.datastore.key import Key
237+
238+
DATASET_ID = 'DATASET'
239+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
240+
rsp_pb = datastore_pb.LookupResponse()
241+
conn = self._makeOne()
242+
URI = '/'.join([
243+
conn.API_BASE_URL,
244+
'datastore',
245+
conn.API_VERSION,
246+
'datasets',
247+
DATASET_ID,
248+
'lookup',
249+
])
250+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
251+
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
252+
cw = http._called_with
253+
self._verifyProtobufCall(cw, URI, conn)
254+
rq_class = datastore_pb.LookupRequest
255+
request = rq_class()
256+
request.ParseFromString(cw['body'])
257+
keys = list(request.key)
258+
self.assertEqual(len(keys), 1)
259+
self.assertEqual(keys[0], key_pb)
260+
self.assertEqual(request.read_options.read_consistency,
261+
datastore_pb.ReadOptions.EVENTUAL)
262+
self.assertEqual(request.read_options.transaction, '')
263+
264+
def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
265+
from gcloud.datastore.key import Key
266+
267+
DATASET_ID = 'DATASET'
268+
TRANSACTION = 'TRANSACTION'
269+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
270+
conn = self._makeOne()
271+
conn.transaction(TRANSACTION)
272+
self.assertRaises(
273+
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)
274+
275+
def test_lookup_single_key_empty_response_w_transaction(self):
276+
from gcloud.datastore.connection import datastore_pb
277+
from gcloud.datastore.key import Key
278+
279+
DATASET_ID = 'DATASET'
280+
TRANSACTION = 'TRANSACTION'
281+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
282+
rsp_pb = datastore_pb.LookupResponse()
283+
conn = self._makeOne()
284+
conn.transaction(TRANSACTION)
285+
URI = '/'.join([
286+
conn.API_BASE_URL,
287+
'datastore',
288+
conn.API_VERSION,
289+
'datasets',
290+
DATASET_ID,
291+
'lookup',
292+
])
293+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
294+
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
295+
cw = http._called_with
296+
self._verifyProtobufCall(cw, URI, conn)
297+
rq_class = datastore_pb.LookupRequest
298+
request = rq_class()
299+
request.ParseFromString(cw['body'])
300+
keys = list(request.key)
301+
self.assertEqual(len(keys), 1)
302+
self.assertEqual(keys[0], key_pb)
303+
self.assertEqual(request.read_options.transaction, TRANSACTION)
304+
234305
def test_lookup_single_key_nonempty_response(self):
235306
from gcloud.datastore.connection import datastore_pb
236307
from gcloud.datastore.key import Key
@@ -443,6 +514,106 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
443514
self.assertEqual(len(keys), 1)
444515
self.assertEqual(keys[0], key_pb2)
445516

517+
def test_run_query_w_eventual_no_transaction(self):
518+
from gcloud.datastore.connection import datastore_pb
519+
from gcloud.datastore.query import Query
520+
521+
DATASET_ID = 'DATASET'
522+
KIND = 'Nonesuch'
523+
CURSOR = b'\x00'
524+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
525+
rsp_pb = datastore_pb.RunQueryResponse()
526+
rsp_pb.batch.end_cursor = CURSOR
527+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
528+
rsp_pb.batch.more_results = no_more
529+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
530+
conn = self._makeOne()
531+
URI = '/'.join([
532+
conn.API_BASE_URL,
533+
'datastore',
534+
conn.API_VERSION,
535+
'datasets',
536+
DATASET_ID,
537+
'runQuery',
538+
])
539+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
540+
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
541+
eventual=True)
542+
self.assertEqual(pbs, [])
543+
self.assertEqual(end, CURSOR)
544+
self.assertTrue(more)
545+
self.assertEqual(skipped, 0)
546+
cw = http._called_with
547+
self._verifyProtobufCall(cw, URI, conn)
548+
rq_class = datastore_pb.RunQueryRequest
549+
request = rq_class()
550+
request.ParseFromString(cw['body'])
551+
self.assertEqual(request.partition_id.namespace, '')
552+
self.assertEqual(request.query, q_pb)
553+
self.assertEqual(request.read_options.read_consistency,
554+
datastore_pb.ReadOptions.EVENTUAL)
555+
self.assertEqual(request.read_options.transaction, '')
556+
557+
def test_run_query_wo_eventual_w_transaction(self):
558+
from gcloud.datastore.connection import datastore_pb
559+
from gcloud.datastore.query import Query
560+
561+
DATASET_ID = 'DATASET'
562+
KIND = 'Nonesuch'
563+
CURSOR = b'\x00'
564+
TRANSACTION = 'TRANSACTION'
565+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
566+
rsp_pb = datastore_pb.RunQueryResponse()
567+
rsp_pb.batch.end_cursor = CURSOR
568+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
569+
rsp_pb.batch.more_results = no_more
570+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
571+
conn = self._makeOne()
572+
conn.transaction(TRANSACTION)
573+
URI = '/'.join([
574+
conn.API_BASE_URL,
575+
'datastore',
576+
conn.API_VERSION,
577+
'datasets',
578+
DATASET_ID,
579+
'runQuery',
580+
])
581+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
582+
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
583+
self.assertEqual(pbs, [])
584+
self.assertEqual(end, CURSOR)
585+
self.assertTrue(more)
586+
self.assertEqual(skipped, 0)
587+
cw = http._called_with
588+
self._verifyProtobufCall(cw, URI, conn)
589+
rq_class = datastore_pb.RunQueryRequest
590+
request = rq_class()
591+
request.ParseFromString(cw['body'])
592+
self.assertEqual(request.partition_id.namespace, '')
593+
self.assertEqual(request.query, q_pb)
594+
self.assertEqual(request.read_options.read_consistency,
595+
datastore_pb.ReadOptions.DEFAULT)
596+
self.assertEqual(request.read_options.transaction, TRANSACTION)
597+
598+
def test_run_query_w_eventual_and_transaction(self):
599+
from gcloud.datastore.connection import datastore_pb
600+
from gcloud.datastore.query import Query
601+
602+
DATASET_ID = 'DATASET'
603+
KIND = 'Nonesuch'
604+
CURSOR = b'\x00'
605+
TRANSACTION = 'TRANSACTION'
606+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
607+
rsp_pb = datastore_pb.RunQueryResponse()
608+
rsp_pb.batch.end_cursor = CURSOR
609+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
610+
rsp_pb.batch.more_results = no_more
611+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
612+
conn = self._makeOne()
613+
conn.transaction(TRANSACTION)
614+
self.assertRaises(
615+
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)
616+
446617
def test_run_query_wo_namespace_empty_result(self):
447618
from gcloud.datastore.connection import datastore_pb
448619
from gcloud.datastore.query import Query

0 commit comments

Comments
 (0)