1414
1515"""Translate NDB queries to Datastore calls."""
1616
17+ import itertools
1718import logging
1819
1920from google .cloud .datastore_v1 .proto import datastore_pb2
2021from google .cloud .datastore_v1 .proto import entity_pb2
2122from google .cloud .datastore_v1 .proto import query_pb2
23+ from google .cloud .datastore import helpers
2224
2325from google .cloud .ndb import context as context_module
2426from google .cloud .ndb import _datastore_api
3436RESULT_TYPE_FULL = ResultType .Value ("FULL" )
3537RESULT_TYPE_PROJECTION = ResultType .Value ("PROJECTION" )
3638
39+ FILTER_OPERATORS = {
40+ "=" : query_pb2 .PropertyFilter .EQUAL ,
41+ "<" : query_pb2 .PropertyFilter .LESS_THAN ,
42+ "<=" : query_pb2 .PropertyFilter .LESS_THAN_OR_EQUAL ,
43+ ">" : query_pb2 .PropertyFilter .GREATER_THAN ,
44+ ">=" : query_pb2 .PropertyFilter .GREATER_THAN_OR_EQUAL ,
45+ }
46+
47+
48+ def make_filter (name , op , value ):
49+ """Make a property filter protocol buffer.
50+
51+ Args:
52+ name (str): The name of the property to filter by.
53+ op (str): The operator to apply in the filter. Must be one of "=", "<",
54+ "<=", ">", or ">=".
55+ value (Any): The value for comparison.
56+
57+ Returns:
58+ query_pb2.PropertyFilter: The filter protocol buffer.
59+ """
60+ filter_pb = query_pb2 .PropertyFilter (
61+ property = query_pb2 .PropertyReference (name = name ),
62+ op = FILTER_OPERATORS [op ],
63+ )
64+ helpers ._set_protobuf_value (filter_pb .value , value )
65+ return filter_pb
66+
67+
68+ def make_composite_and_filter (filter_pbs ):
69+ """Make a composite filter protocol buffer using AND.
70+
71+ Args:
72+ List[Union[query_pb2.PropertyFilter, query_pb2.CompositeFilter]]: The
73+ list of filters to be combined.
74+
75+ Returns:
76+ query_pb2.CompositeFilter: The new composite filter.
77+ """
78+ return query_pb2 .CompositeFilter (
79+ op = query_pb2 .CompositeFilter .AND ,
80+ filters = [_filter_pb (filter_pb ) for filter_pb in filter_pbs ],
81+ )
82+
3783
3884@tasklets .tasklet
3985def fetch (query ):
@@ -45,7 +91,7 @@ def fetch(query):
4591 Returns:
4692 tasklets.Future: Result is List[model.Model]: The query results.
4793 """
48- for name in ("filters" , " orders" , "default_options" ):
94+ for name in ("orders" , "default_options" ):
4995 if getattr (query , name , None ):
5096 raise NotImplementedError (
5197 "{} is not yet implemented for queries." .format (name )
@@ -61,14 +107,59 @@ def fetch(query):
61107 if not namespace :
62108 namespace = client .namespace
63109
64- query_pb = _query_to_protobuf (query )
65- results = yield _run_query (project_id , namespace , query_pb )
110+ filter_pbs = (None ,)
111+ if query .filters :
112+ filter_pbs = query .filters ._to_filter ()
113+ if not isinstance (filter_pbs , (tuple , list )):
114+ filter_pbs = (filter_pbs ,)
115+
116+ queries = [
117+ _run_query (project_id , namespace , _query_to_protobuf (query , filter_pb ))
118+ for filter_pb in filter_pbs
119+ ]
120+ results = yield queries
121+
122+ if len (results ) > 1 :
123+ results = _merge_results (results )
124+ else :
125+ results = results [0 ]
126+
66127 return [
67128 _process_result (result_type , result , query .projection )
68129 for result_type , result in results
69130 ]
70131
71132
133+ def _merge_results (results ):
134+ """Merge the results of distinct queries.
135+
136+ Some queries that in NDB are logically a single query have to be broken
137+ up into two or more Datastore queries, because Datastore doesn't have a
138+ composite filter with a boolean OR. The `results` are the result sets from
139+ two or more queries which logically form a composite query joined by OR.
140+ The individual result sets are combined into a single result set,
141+ consolidating any results which may be common to two or more result sets.
142+
143+ Args:
144+ results (List[Tuple[query_pb2.EntityResult.ResultType,
145+ query_pb2.EntityResult]]): List of individual result sets as
146+ returned by :func:`_run_query`. These are merged into the final
147+ result.
148+
149+ Returns:
150+ List[Tuple[query_pb2.EntityResult.ResultType,
151+ query_pb2.EntityResult]]: The merged result set.
152+ """
153+ seen_keys = set ()
154+ for result_type , result in itertools .chain (* results ):
155+ hash_key = result .entity .key .SerializeToString ()
156+ if hash_key in seen_keys :
157+ continue
158+
159+ seen_keys .add (hash_key )
160+ yield result_type , result
161+
162+
72163def _process_result (result_type , result , projection ):
73164 """Process a single entity result.
74165
@@ -98,11 +189,13 @@ def _process_result(result_type, result, projection):
98189 )
99190
100191
101- def _query_to_protobuf (query ):
192+ def _query_to_protobuf (query , filter_pb = None ):
102193 """Convert an NDB query to a Datastore protocol buffer.
103194
104195 Args:
105196 query (query.Query): The query.
197+ filter_pb (Optional[query_pb2.Filter]): The filter to apply for this
198+ query.
106199
107200 Returns:
108201 query_pb2.Query: The protocol buffer representation of the query.
@@ -125,22 +218,55 @@ def _query_to_protobuf(query):
125218 for name in query .distinct_on
126219 ]
127220
128- filters = []
129221 if query .ancestor :
130222 ancestor_pb = query .ancestor ._key .to_protobuf ()
131- filter_pb = query_pb2 .PropertyFilter (
223+ ancestor_filter_pb = query_pb2 .PropertyFilter (
132224 property = query_pb2 .PropertyReference (name = "__key__" ),
133225 op = query_pb2 .PropertyFilter .HAS_ANCESTOR ,
134226 )
135- filter_pb .value .key_value .CopyFrom (ancestor_pb )
136- filters .append (filter_pb )
227+ ancestor_filter_pb .value .key_value .CopyFrom (ancestor_pb )
228+
229+ if filter_pb is None :
230+ filter_pb = ancestor_filter_pb
137231
138- if len (filters ) == 1 :
139- query_args ["filter" ] = query_pb2 .Filter (property_filter = filters [0 ])
232+ elif isinstance (filter_pb , query_pb2 .CompositeFilter ):
233+ filter_pb .filters .add (property_filter = ancestor_filter_pb )
234+
235+ else :
236+ filter_pb = query_pb2 .CompositeFilter (
237+ op = query_pb2 .CompositeFilter .AND ,
238+ filters = [
239+ _filter_pb (filter_pb ),
240+ _filter_pb (ancestor_filter_pb ),
241+ ],
242+ )
243+
244+ if filter_pb is not None :
245+ query_args ["filter" ] = _filter_pb (filter_pb )
140246
141247 return query_pb2 .Query (** query_args )
142248
143249
250+ def _filter_pb (filter_pb ):
251+ """Convenience function to compose a filter protocol buffer.
252+
253+ The Datastore protocol uses a Filter message which has one of either a
254+ PropertyFilter or CompositeFilter as a sole attribute.
255+
256+ Args:
257+ filter_pb (Union[query_pb2.CompositeFilter, query_pb2.PropertyFilter]):
258+ The actual filter.
259+
260+ Returns:
261+ query_pb2.Filter: The filter at the higher level of abstraction
262+ required to use it in a query.
263+ """
264+ if isinstance (filter_pb , query_pb2 .CompositeFilter ):
265+ return query_pb2 .Filter (composite_filter = filter_pb )
266+
267+ return query_pb2 .Filter (property_filter = filter_pb )
268+
269+
144270@tasklets .tasklet
145271def _run_query (project_id , namespace , query_pb ):
146272 """Run a query in Datastore.
0 commit comments