Skip to content

Commit 0a9a2dc

Browse files
authored
Support for Paging Predicate implementation (#212)
* "Support for Paging Predicate, draft implementation" * "Fixed import error" * "Fixed serialization errors, added tests, implemented synchronous paging support" * "Custom comparator tests and fixes" * "test_key_set_with_custom_comparator fixed" * "documentation added" * "Changes and fixes based on initial review" * "Sped up map_paging_predicate_test, fixed documentation" * "Code sample for paging predicate" * "test file fixes" * "Fixes and enhancements after review round 2."
1 parent 4cc593e commit 0a9a2dc

File tree

12 files changed

+662
-15
lines changed

12 files changed

+662
-15
lines changed

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
* [7.7.1.2. Querying by Combining Predicates with AND, OR, NOT](#7712-querying-by-combining-predicates-with-and-or-not)
8080
* [7.7.1.3. Querying with SQL](#7713-querying-with-sql)
8181
* [7.7.1.4. Querying with JSON Strings](#7714-querying-with-json-strings)
82+
* [7.7.1.5. Filtering with Paging Predicates](#7715-filtering-with-paging-predicates)
8283
* [7.8. Performance](#78-performance)
8384
* [7.8.1. Near Cache](#781-near-cache)
8485
* [7.8.1.1. Configuring Near Cache](#7811-configuring-near-cache)
@@ -2243,6 +2244,53 @@ department_with_peter = departments.values(is_equal_to("people[any].name", "Pete
22432244

22442245
`HazelcastJsonValue` is a lightweight wrapper around your JSON strings. It is used merely as a way to indicate that the contained string should be treated as a valid JSON value. Hazelcast does not check the validity of JSON strings put into to the maps. Putting an invalid JSON string into a map is permissible. However, in that case whether such an entry is going to be returned or not from a query is not defined.
22452246

2247+
#### 7.7.1.5. Filtering with Paging Predicates
2248+
2249+
The Python client provides paging for defined predicates. With its `PagingPredicate` class, you can get a list of keys, values, or entries page by page by filtering them with predicates and giving the size of the pages. Also, you can sort the entries by specifying comparators.
2250+
2251+
In the example code below:
2252+
2253+
The `greater_equal_predicate` gets values from the "students" map. This predicate has a filter to retrieve the objects with an "age" greater than or equal to 18.
2254+
2255+
Then a `PagingPredicate` is constructed in which the page size is 5, so that there are five objects in each page. The first time the values are called creates the first page.
2256+
2257+
It gets subsequent pages with the `next_page()` method of `PagingPredicate` and querying the map again with the updated `PagingPredicate`.
2258+
2259+
```python
2260+
from hazelcast.serialization.predicate import is_greater_than_or_equal_to, paging_predicate
2261+
from hazelcast import HazelcastClient
2262+
2263+
student_map = HazelcastClient().get_map('students').blocking()
2264+
greater_equal_predicate = is_greater_than_or_equal_to('age', 18)
2265+
paging_predicate = paging_predicate(greater_equal_predicate, 5)
2266+
2267+
# Retrieve first page:
2268+
students_first = student_map.values(paging_predicate)
2269+
# ...
2270+
# Set up next page:
2271+
paging_predicate.next_page()
2272+
2273+
# Retrieve next page:
2274+
students_second = student_map.values(paging_predicate)
2275+
2276+
# Set page to fourth page and retrieve (page index = page no - 1):
2277+
paging_predicate.page = 3
2278+
students_fourth = student_map.values(paging_predicate)
2279+
```
2280+
2281+
If you want to sort the result in a specific way before paging, you need to specify a custom comparator object that extends `hazelcast.core.Comparator`, which
2282+
provides an interface to a comparator object to compare two map entries in a distributed map. Also, this comparator class should extend one of `IdentifiedDataSerializable` or `Portable` to be Hazelcast-serializable.
2283+
After implementing this class in Python, you need to implement the Java equivalent of it and its factory. The Java equivalent of the comparator should implement `java.util.Comparator`. Note that the `compare` function of `Comparator` on the Java side is the equivalent of the `compare` function of `Comparator` on the Python side.
2284+
When you implement the Comparator and its factory, you can add them to the CLASSPATH of the server side.
2285+
See the [Adding User Library to CLASSPATH](#1212-adding-user-library-to-classpath) section.
2286+
2287+
If a comparator is not specified for `PagingPredicate`, but you want to get a collection of keys or values page by page, this collection must be
2288+
an instance of Java `Comparable` (i.e., it must implement `java.lang.Comparable`). Otherwise, the `java.lang.IllegalArgument` exception is thrown.
2289+
It should also be Python-comparable, that is its Python implementation should include the `__lt__()` method.
2290+
2291+
Also, you can access a specific page more easily by accessing the field `paging_predicate.page`. This way, if you make a query for page index 99, for example, it will get all 100 pages at once instead of reaching the 100th page one by one using the `next_page()` function.
2292+
See the code sample under `examples.map.map_paging_predicate_example` for more detail.
2293+
22462294
## 7.8. Performance
22472295

22482296
### 7.8.1. Near Cache
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import hazelcast
2+
3+
from hazelcast.serialization.predicate import is_greater_than_or_equal_to, PagingPredicate
4+
5+
if __name__ == "__main__":
6+
client = hazelcast.HazelcastClient()
7+
8+
student_grades_map = client.get_map('student grades').blocking()
9+
student_grades_map.put_all({"student" + str(i): 100-i for i in range(10)})
10+
11+
greater_equal_predicate = is_greater_than_or_equal_to('this', 94) # Query for student grades that are >= 94.
12+
paging_predicate = PagingPredicate(greater_equal_predicate, 2) # Page size 2.
13+
14+
# Retrieve first page:
15+
grades_first_page = student_grades_map.values(paging_predicate) # [94, 95]
16+
17+
# ...
18+
# Set up next page:
19+
paging_predicate.next_page()
20+
21+
# Retrieve next page:
22+
grades_second_page = student_grades_map.values(paging_predicate) # [96, 97]
23+
24+
# Set page to fourth page and retrieve (page index = page no - 1):
25+
paging_predicate.page = 3
26+
grades_fourth_page = student_grades_map.values(paging_predicate) # [100]

hazelcast/core.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,26 @@ def loads(self):
221221
:return: (object), Python object represented by the original string
222222
"""
223223
return json.loads(self._json_string)
224+
225+
226+
class Comparator(object):
227+
"""
228+
Comparator provides an interface to a comparator object to compare two map entries in a distributed map.
229+
The custom comparator class must also extend either Portable or IdentifiedDataSerializable to be
230+
Hazelcast-serializable.
231+
A comparator class with the same functionality should be registered on Hazelcast server in order to be used
232+
in PagingPredicate.
233+
"""
234+
def compare(self, entry1, entry2):
235+
"""
236+
This method is used to determine order of entries when sorting.
237+
- If return value is a negative value, [entry1] comes after [entry2],
238+
- If return value is a positive value, [entry1] comes before [entry2],
239+
- If return value is 0, [entry1] and [entry2] are indistinguishable in this sorting mechanism.
240+
Their order with respect to each other is undefined.
241+
This method must always return the same result given the same pair of entries.
242+
:param entry1: (K,V pair), first entry
243+
:param entry2: (K,V pair), second entry
244+
:return: (int), order index
245+
"""
246+
raise NotImplementedError()

hazelcast/protocol/codec/map_entries_with_paging_predicate_codec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
3434
response_size = client_message.read_int()
3535
response = []
3636
for _ in range(0, response_size):
37-
response_item = (client_message.read_data(), client_message.read_data())
37+
response_item = (to_object(client_message.read_data()), to_object(client_message.read_data()))
3838
response.append(response_item)
39-
parameters['response'] = ImmutableLazyDataList(response, to_object)
39+
parameters['response'] = response
4040
return parameters

hazelcast/protocol/codec/map_key_set_with_paging_predicate_codec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
3434
response_size = client_message.read_int()
3535
response = []
3636
for _ in range(0, response_size):
37-
response_item = client_message.read_data()
37+
response_item = (to_object(client_message.read_data()), None)
3838
response.append(response_item)
39-
parameters['response'] = ImmutableLazyDataList(response, to_object)
39+
parameters['response'] = response
4040
return parameters

hazelcast/protocol/codec/map_values_with_paging_predicate_codec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
3434
response_size = client_message.read_int()
3535
response = []
3636
for _ in range(0, response_size):
37-
response_item = (client_message.read_data(), client_message.read_data())
37+
response_item = (to_object(client_message.read_data()), to_object(client_message.read_data()))
3838
response.append(response_item)
39-
parameters['response'] = ImmutableLazyDataList(response, to_object)
39+
parameters['response'] = response
4040
return parameters

hazelcast/proxy/map.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
map_remove_entry_listener_codec, map_replace_codec, map_replace_if_same_codec, map_set_codec, map_try_lock_codec, \
1414
map_try_put_codec, map_try_remove_codec, map_unlock_codec, map_values_codec, map_values_with_predicate_codec, \
1515
map_add_interceptor_codec, map_execute_on_all_keys_codec, map_execute_on_key_codec, map_execute_on_keys_codec, \
16-
map_execute_with_predicate_codec, map_add_near_cache_entry_listener_codec
16+
map_execute_with_predicate_codec, map_add_near_cache_entry_listener_codec, map_entries_with_paging_predicate_codec,\
17+
map_key_set_with_paging_predicate_codec, map_values_with_paging_predicate_codec
1718
from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType, get_entry_listener_flags, MAX_SIZE
18-
from hazelcast.util import check_not_none, thread_id, to_millis
19+
from hazelcast.util import check_not_none, thread_id, to_millis, get_sorted_query_result_set, ITERATION_TYPE
1920
from hazelcast import six
21+
from hazelcast.serialization.predicate import PagingPredicate
2022

2123

2224
class Map(Proxy):
@@ -226,8 +228,15 @@ def entry_set(self, predicate=None):
226228
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
227229
"""
228230
if predicate:
229-
predicate_data = self._to_data(predicate)
230-
return self._encode_invoke(map_entries_with_predicate_codec, predicate=predicate_data)
231+
if isinstance(predicate, PagingPredicate):
232+
predicate.iteration_type = ITERATION_TYPE.ENTRY
233+
predicate_data = self._to_data(predicate)
234+
result_list_future = self._encode_invoke(map_entries_with_paging_predicate_codec, predicate=predicate_data)
235+
return result_list_future.continue_with(get_sorted_query_result_set, predicate)
236+
237+
else:
238+
predicate_data = self._to_data(predicate)
239+
return self._encode_invoke(map_entries_with_predicate_codec, predicate=predicate_data)
231240
else:
232241
return self._encode_invoke(map_entry_set_codec)
233242

@@ -441,8 +450,14 @@ def key_set(self, predicate=None):
441450
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
442451
"""
443452
if predicate:
444-
predicate_data = self._to_data(predicate)
445-
return self._encode_invoke(map_key_set_with_predicate_codec, predicate=predicate_data)
453+
if isinstance(predicate, PagingPredicate):
454+
predicate.iteration_type = ITERATION_TYPE.KEY
455+
predicate_data = self._to_data(predicate)
456+
result_list_future = self._encode_invoke(map_key_set_with_paging_predicate_codec, predicate=predicate_data)
457+
return result_list_future.continue_with(get_sorted_query_result_set, predicate)
458+
else:
459+
predicate_data = self._to_data(predicate)
460+
return self._encode_invoke(map_key_set_with_predicate_codec, predicate=predicate_data)
446461
else:
447462
return self._encode_invoke(map_key_set_codec)
448463

@@ -819,8 +834,14 @@ def values(self, predicate=None):
819834
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
820835
"""
821836
if predicate:
822-
predicate_data = self._to_data(predicate)
823-
return self._encode_invoke(map_values_with_predicate_codec, predicate=predicate_data)
837+
if isinstance(predicate, PagingPredicate):
838+
predicate.iteration_type = ITERATION_TYPE.VALUE
839+
predicate_data = self._to_data(predicate)
840+
result_list_future = self._encode_invoke(map_values_with_paging_predicate_codec, predicate=predicate_data)
841+
return result_list_future.continue_with(get_sorted_query_result_set, predicate)
842+
else:
843+
predicate_data = self._to_data(predicate)
844+
return self._encode_invoke(map_values_with_predicate_codec, predicate=predicate_data)
824845
else:
825846
return self._encode_invoke(map_values_codec)
826847

hazelcast/serialization/predicate.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from hazelcast.serialization.api import IdentifiedDataSerializable
2+
from hazelcast.util import ITERATION_TYPE
23

34
PREDICATE_FACTORY_ID = -32
45

@@ -204,6 +205,123 @@ def write_data(self, object_data_output):
204205
def __repr__(self):
205206
return "TruePredicate()"
206207

208+
209+
class PagingPredicate(Predicate):
210+
CLASS_ID = 15
211+
212+
__NULL_ANCHOR = (-1, None)
213+
214+
def __init__(self, predicate, page_size, comparator=None):
215+
"""
216+
Creates a Paging predicate with provided page size, internal predicate, and optional comparator.
217+
:param predicate: predicate to filter the results
218+
:param page_size: page size of each result set
219+
:param comparator: (Optional) a comparator object used to sort the results.
220+
Defines compare method on (K,V) tuples. Must be an implementation of hazelcast.core.Comparator
221+
WARNING: comparator must extend Comparator, and either IdentifiedDataSerializable or Portable.
222+
"""
223+
if isinstance(predicate, PagingPredicate):
224+
raise TypeError('Nested paging predicate not supported.')
225+
self._internal_predicate = predicate
226+
self._comparator = comparator
227+
if page_size <= 0:
228+
raise ValueError('page_size should be greater than 0.')
229+
self._page_size = page_size
230+
self._page = 0 # initialized to be on first page
231+
self.iteration_type = ITERATION_TYPE.ENTRY # ENTRY as default.
232+
self.anchor_list = [] # List of pairs: (nearest page, (anchor key, anchor value))
233+
234+
def __repr__(self):
235+
return "PagingPredicate(predicate=%s, page_size=%s, comparator=%s)" % (self._internal_predicate,
236+
self.page_size, self.comparator)
237+
238+
def write_data(self, output):
239+
output.write_object(self._internal_predicate)
240+
output.write_object(self.comparator)
241+
output.write_int(self.page)
242+
output.write_int(self._page_size)
243+
output.write_utf(ITERATION_TYPE.reverse.get(self.iteration_type, None))
244+
output.write_int(len(self.anchor_list))
245+
for nearest_page, (anchor_key, anchor_value) in self.anchor_list:
246+
output.write_int(nearest_page)
247+
output.write_object(anchor_key)
248+
output.write_object(anchor_value)
249+
250+
def next_page(self):
251+
"""
252+
Sets page index to next page. If new index is out of range, the query results that this paging predicate will
253+
retrieve will be an empty list.
254+
:return (int) current page index
255+
"""
256+
self.page += 1
257+
return self.page
258+
259+
def previous_page(self):
260+
"""
261+
If current page index is 0, this method does nothing. Otherwise, it sets page index to previous page.
262+
:return (int) current page index
263+
"""
264+
if self.page != 0:
265+
self.page -= 1
266+
return self.page
267+
268+
def set_anchor(self, nearest_page, anchor):
269+
anchor_entry = (nearest_page, anchor)
270+
anchor_count = len(self.anchor_list)
271+
if nearest_page < anchor_count:
272+
self.anchor_list[nearest_page] = anchor_entry
273+
elif nearest_page == anchor_count:
274+
self.anchor_list.append(anchor_entry)
275+
else:
276+
raise IndexError('Anchor index is not correct, expected: ' + str(self.page) + 'found: ' + str(anchor_count))
277+
278+
def reset(self):
279+
self.iteration_type = ITERATION_TYPE.ENTRY
280+
self.anchor_list.clear()
281+
self.page = 0
282+
283+
def get_nearest_anchor_entry(self):
284+
"""
285+
After each query, an anchor entry is set for that page.
286+
For the next query user may set an arbitrary page.
287+
For example: user queried first 5 pages which means first 5 anchor is available
288+
if the next query is for the 10th page then the nearest anchor belongs to page 5
289+
but if the next query is for the 3nd page then the nearest anchor belongs to page 2.
290+
:return nearest anchored entry for current page
291+
"""
292+
anchor_count = len(self.anchor_list)
293+
if self.page == 0 or anchor_count == 0:
294+
return PagingPredicate.__NULL_ANCHOR
295+
return self.anchor_list[self.page - 1] if self.page < anchor_count else self.anchor_list[anchor_count - 1]
296+
297+
@property
298+
def page(self):
299+
return self._page
300+
301+
@page.setter
302+
def page(self, page_no):
303+
"""
304+
Sets page index to specified page_no.
305+
If page_no is out of range, the query results that this paging predicate will retrieve will be an empty list.
306+
:param page_no: (int) greater than or equal to 0.
307+
"""
308+
if page_no < 0:
309+
raise ValueError('page_no should be positive or 0.')
310+
self._page = page_no
311+
312+
@property
313+
def page_size(self):
314+
return self._page_size
315+
316+
@property
317+
def internal_predicate(self):
318+
return self._internal_predicate
319+
320+
@property
321+
def comparator(self):
322+
return self._comparator
323+
324+
207325
sql = SqlPredicate
208326
is_equal_to = EqualPredicate
209327
is_not_equal_to = NotEqualPredicate
@@ -236,3 +354,7 @@ def is_less_than(attribute, x):
236354
def is_less_than_or_equal_to(attribute, x):
237355
return GreaterLessPredicate(attribute, x, True, True)
238356

357+
358+
def paging_predicate(predicate, page_size):
359+
return PagingPredicate(predicate, page_size)
360+

hazelcast/serialization/serializer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def read(self, inp):
2525
return None
2626

2727
# "write(self, out, obj)" is never called so not implemented here
28+
def write(self, out, obj):
29+
pass
2830

2931
def get_type_id(self):
3032
return CONSTANT_TYPE_NULL

0 commit comments

Comments
 (0)