Skip to content

Commit a41ff58

Browse files
committed
Add 'eventual' flag to 'Connection.lookup()'/'Connection.run_query()'.
Allows selecting EVENTUAL consistency, rather than the STRONG default. Fixes #305.
1 parent 21e5f45 commit a41ff58

File tree

2 files changed

+239
-14
lines changed

2 files changed

+239
-14
lines changed

gcloud/datastore/connection.py

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
167167
kwargs['connection'] = self
168168
return Dataset(*args, **kwargs)
169169

170-
def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
170+
def lookup(self, dataset_id, key_pbs,
171+
missing=None, deferred=None, eventual=False):
171172
"""Lookup keys from a dataset in the Cloud Datastore.
172173
173174
Maps the ``DatastoreService.Lookup`` protobuf RPC.
@@ -211,22 +212,40 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
211212
by the backend as "deferred" will be copied into it.
212213
Use only as a keyword param.
213214
215+
:type eventual: bool
216+
:param eventual: If False (the default), request ``STRONG`` read
217+
consistency. If True, request ``EVENTUAL`` read
218+
consistency. If the connection has a current
219+
transaction, this value *must* be false.
220+
214221
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
215222
(or a single Entity)
216223
:returns: The entities corresponding to the keys provided.
217224
If a single key was provided and no results matched,
218225
this will return None.
219226
If multiple keys were provided and no results matched,
220227
this will return an empty list.
228+
:raises: ValueError if ``eventual`` is True
221229
"""
222230
if missing is not None and missing != []:
223231
raise ValueError('missing must be None or an empty list')
224232

225233
if deferred is not None and deferred != []:
226234
raise ValueError('deferred must be None or an empty list')
227235

236+
transaction = self.transaction()
237+
if eventual and transaction:
238+
raise ValueError('eventual must be False when in a transaction')
239+
228240
lookup_request = datastore_pb.LookupRequest()
229241

242+
opts = lookup_request.read_options
243+
ro_enum = datastore_pb.ReadOptions.ReadConsistency
244+
if eventual:
245+
opts.read_consistency = ro_enum.Value('EVENTUAL')
246+
elif transaction:
247+
opts.transaction = transaction
248+
230249
single_key = isinstance(key_pbs, datastore_pb.Key)
231250

232251
if single_key:
@@ -235,19 +254,42 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
235254
for key_pb in key_pbs:
236255
lookup_request.key.add().CopyFrom(key_pb)
237256

257+
results, m_found, d_found = self._lookup(lookup_request, dataset_id,
258+
deferred is not None)
259+
260+
if missing is not None:
261+
missing.extend(m_found)
262+
263+
if deferred is not None:
264+
deferred.extend(d_found)
265+
266+
if single_key:
267+
if results:
268+
return results[0]
269+
else:
270+
return None
271+
272+
return results
273+
274+
def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
275+
"""Repeat lookup until all keys found (unless stop requested).
276+
277+
Helper method for ``lookup()``.
278+
"""
238279
results = []
280+
missing = []
281+
deferred = []
239282
while True: # loop against possible deferred.
240283
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
241284
datastore_pb.LookupResponse)
242285

243286
results.extend(
244287
[result.entity for result in lookup_response.found])
245288

246-
if missing is not None:
247-
missing.extend(
248-
[result.entity for result in lookup_response.missing])
289+
missing.extend(
290+
[result.entity for result in lookup_response.missing])
249291

250-
if deferred is not None:
292+
if stop_on_deferred:
251293
deferred.extend([key for key in lookup_response.deferred])
252294
break
253295

@@ -257,16 +299,9 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
257299
# We have deferred keys, and the user didn't ask to know about
258300
# them, so retry (but only with the deferred ones).
259301
_copy_deferred_keys(lookup_request, lookup_response)
302+
return results, missing, deferred
260303

261-
if single_key:
262-
if results:
263-
return results[0]
264-
else:
265-
return None
266-
267-
return results
268-
269-
def run_query(self, dataset_id, query_pb, namespace=None):
304+
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
270305
"""Run a query on the Cloud Datastore.
271306
272307
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
@@ -310,8 +345,24 @@ def run_query(self, dataset_id, query_pb, namespace=None):
310345
311346
:type namespace: string
312347
:param namespace: The namespace over which to run the query.
348+
349+
:type eventual: bool
350+
:param eventual: If False (the default), request ``STRONG`` read
351+
consistency. If True, request ``EVENTUAL`` read
352+
consistency. If the connection has a current
353+
transaction, this value *must* be false.
313354
"""
355+
transaction = self.transaction()
356+
if eventual and transaction:
357+
raise ValueError('eventual must be False when in a transaction')
358+
314359
request = datastore_pb.RunQueryRequest()
360+
opts = request.read_options
361+
ro_enum = datastore_pb.ReadOptions.ReadConsistency
362+
if eventual:
363+
opts.read_consistency = ro_enum.Value('EVENTUAL')
364+
elif transaction:
365+
opts.transaction = transaction
315366

316367
if namespace:
317368
request.partition_id.namespace = namespace

gcloud/datastore/test_connection.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,78 @@ def test_lookup_single_key_empty_response(self):
231231
self.assertEqual(len(keys), 1)
232232
self.assertEqual(keys[0], key_pb)
233233

234+
def test_lookup_single_key_empty_response_w_eventual(self):
235+
from gcloud.datastore.connection import datastore_pb
236+
from gcloud.datastore.key import Key
237+
238+
DATASET_ID = 'DATASET'
239+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
240+
rsp_pb = datastore_pb.LookupResponse()
241+
conn = self._makeOne()
242+
URI = '/'.join([
243+
conn.API_BASE_URL,
244+
'datastore',
245+
conn.API_VERSION,
246+
'datasets',
247+
DATASET_ID,
248+
'lookup',
249+
])
250+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
251+
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
252+
cw = http._called_with
253+
self._verifyProtobufCall(cw, URI, conn)
254+
rq_class = datastore_pb.LookupRequest
255+
request = rq_class()
256+
request.ParseFromString(cw['body'])
257+
keys = list(request.key)
258+
self.assertEqual(len(keys), 1)
259+
self.assertEqual(keys[0], key_pb)
260+
ro_enum = datastore_pb.ReadOptions.ReadConsistency
261+
self.assertEqual(request.read_options.read_consistency,
262+
ro_enum.Value('EVENTUAL'))
263+
self.assertEqual(request.read_options.transaction, '')
264+
265+
def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
266+
from gcloud.datastore.key import Key
267+
268+
DATASET_ID = 'DATASET'
269+
TRANSACTION = 'TRANSACTION'
270+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
271+
conn = self._makeOne()
272+
conn.transaction(TRANSACTION)
273+
self.assertRaises(
274+
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)
275+
276+
def test_lookup_single_key_empty_response_w_transaction(self):
277+
from gcloud.datastore.connection import datastore_pb
278+
from gcloud.datastore.key import Key
279+
280+
DATASET_ID = 'DATASET'
281+
TRANSACTION = 'TRANSACTION'
282+
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
283+
rsp_pb = datastore_pb.LookupResponse()
284+
conn = self._makeOne()
285+
conn.transaction(TRANSACTION)
286+
URI = '/'.join([
287+
conn.API_BASE_URL,
288+
'datastore',
289+
conn.API_VERSION,
290+
'datasets',
291+
DATASET_ID,
292+
'lookup',
293+
])
294+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
295+
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
296+
cw = http._called_with
297+
self._verifyProtobufCall(cw, URI, conn)
298+
rq_class = datastore_pb.LookupRequest
299+
request = rq_class()
300+
request.ParseFromString(cw['body'])
301+
keys = list(request.key)
302+
self.assertEqual(len(keys), 1)
303+
self.assertEqual(keys[0], key_pb)
304+
self.assertEqual(request.read_options.transaction, TRANSACTION)
305+
234306
def test_lookup_single_key_nonempty_response(self):
235307
from gcloud.datastore.connection import datastore_pb
236308
from gcloud.datastore.key import Key
@@ -443,6 +515,108 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
443515
self.assertEqual(len(keys), 1)
444516
self.assertEqual(keys[0], key_pb2)
445517

518+
def test_run_query_w_eventual_no_transaction(self):
519+
from gcloud.datastore.connection import datastore_pb
520+
from gcloud.datastore.query import Query
521+
522+
DATASET_ID = 'DATASET'
523+
KIND = 'Nonesuch'
524+
CURSOR = b'\x00'
525+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
526+
rsp_pb = datastore_pb.RunQueryResponse()
527+
rsp_pb.batch.end_cursor = CURSOR
528+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
529+
rsp_pb.batch.more_results = no_more
530+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
531+
conn = self._makeOne()
532+
URI = '/'.join([
533+
conn.API_BASE_URL,
534+
'datastore',
535+
conn.API_VERSION,
536+
'datasets',
537+
DATASET_ID,
538+
'runQuery',
539+
])
540+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
541+
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
542+
eventual=True)
543+
self.assertEqual(pbs, [])
544+
self.assertEqual(end, CURSOR)
545+
self.assertTrue(more)
546+
self.assertEqual(skipped, 0)
547+
cw = http._called_with
548+
self._verifyProtobufCall(cw, URI, conn)
549+
rq_class = datastore_pb.RunQueryRequest
550+
request = rq_class()
551+
request.ParseFromString(cw['body'])
552+
self.assertEqual(request.partition_id.namespace, '')
553+
self.assertEqual(request.query, q_pb)
554+
ro_enum = datastore_pb.ReadOptions.ReadConsistency
555+
self.assertEqual(request.read_options.read_consistency,
556+
ro_enum.Value('EVENTUAL'))
557+
self.assertEqual(request.read_options.transaction, '')
558+
559+
def test_run_query_wo_eventual_w_transaction(self):
560+
from gcloud.datastore.connection import datastore_pb
561+
from gcloud.datastore.query import Query
562+
563+
DATASET_ID = 'DATASET'
564+
KIND = 'Nonesuch'
565+
CURSOR = b'\x00'
566+
TRANSACTION = 'TRANSACTION'
567+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
568+
rsp_pb = datastore_pb.RunQueryResponse()
569+
rsp_pb.batch.end_cursor = CURSOR
570+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
571+
rsp_pb.batch.more_results = no_more
572+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
573+
conn = self._makeOne()
574+
conn.transaction(TRANSACTION)
575+
URI = '/'.join([
576+
conn.API_BASE_URL,
577+
'datastore',
578+
conn.API_VERSION,
579+
'datasets',
580+
DATASET_ID,
581+
'runQuery',
582+
])
583+
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
584+
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
585+
self.assertEqual(pbs, [])
586+
self.assertEqual(end, CURSOR)
587+
self.assertTrue(more)
588+
self.assertEqual(skipped, 0)
589+
cw = http._called_with
590+
self._verifyProtobufCall(cw, URI, conn)
591+
rq_class = datastore_pb.RunQueryRequest
592+
request = rq_class()
593+
request.ParseFromString(cw['body'])
594+
self.assertEqual(request.partition_id.namespace, '')
595+
self.assertEqual(request.query, q_pb)
596+
ro_enum = datastore_pb.ReadOptions.ReadConsistency
597+
self.assertEqual(request.read_options.read_consistency,
598+
ro_enum.Value('DEFAULT'))
599+
self.assertEqual(request.read_options.transaction, TRANSACTION)
600+
601+
def test_run_query_w_eventual_and_transaction(self):
602+
from gcloud.datastore.connection import datastore_pb
603+
from gcloud.datastore.query import Query
604+
605+
DATASET_ID = 'DATASET'
606+
KIND = 'Nonesuch'
607+
CURSOR = b'\x00'
608+
TRANSACTION = 'TRANSACTION'
609+
q_pb = Query(KIND, DATASET_ID).to_protobuf()
610+
rsp_pb = datastore_pb.RunQueryResponse()
611+
rsp_pb.batch.end_cursor = CURSOR
612+
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
613+
rsp_pb.batch.more_results = no_more
614+
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
615+
conn = self._makeOne()
616+
conn.transaction(TRANSACTION)
617+
self.assertRaises(
618+
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)
619+
446620
def test_run_query_wo_namespace_empty_result(self):
447621
from gcloud.datastore.connection import datastore_pb
448622
from gcloud.datastore.query import Query

0 commit comments

Comments
 (0)