Skip to content
Merged
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
* [7.7.1.2. Querying by Combining Predicates with AND, OR, NOT](#7712-querying-by-combining-predicates-with-and-or-not)
* [7.7.1.3. Querying with SQL](#7713-querying-with-sql)
* [7.7.1.4. Querying with JSON Strings](#7714-querying-with-json-strings)
* [7.7.1.5. Filtering with Paging Predicates](#7715-filtering-with-paging-predicates)
* [7.8. Performance](#78-performance)
* [7.8.1. Near Cache](#781-near-cache)
* [7.8.1.1. Configuring Near Cache](#7811-configuring-near-cache)
Expand Down Expand Up @@ -2243,6 +2244,53 @@ department_with_peter = departments.values(is_equal_to("people[any].name", "Pete

`HazelcastJsonValue` is a lightweight wrapper around your JSON strings. It is used merely as a way to indicate that the contained string should be treated as a valid JSON value. Hazelcast does not check the validity of JSON strings put into to the maps. Putting an invalid JSON string into a map is permissible. However, in that case whether such an entry is going to be returned or not from a query is not defined.

#### 7.7.1.5. Filtering with Paging Predicates

The Python client provides paging for defined predicates. With its `PagingPredicate` class, you can get a list of keys, values, or entries page by page by filtering them with predicates and giving the size of the pages. Also, you can sort the entries by specifying comparators.

In the example code below:

The `greater_equal_predicate` gets values from the "students" map. This predicate has a filter to retrieve the objects with an "age" greater than or equal to 18.

Then a `PagingPredicate` is constructed in which the page size is 5, so that there are five objects in each page. The first time the values are called creates the first page.

It gets subsequent pages with the `next_page()` method of `PagingPredicate` and querying the map again with the updated `PagingPredicate`.

```python
from hazelcast.serialization.predicate import is_greater_than_or_equal_to, paging_predicate
from hazelcast import HazelcastClient

student_map = HazelcastClient().get_map('students').blocking()
greater_equal_predicate = is_greater_than_or_equal_to('age', 18)
paging_predicate = paging_predicate(greater_equal_predicate, 5)

# Retrieve first page:
students_first = student_map.values(paging_predicate)
# ...
# Set up next page:
paging_predicate.next_page()

# Retrieve next page:
students_second = student_map.values(paging_predicate)

# Set page to fourth page and retrieve (page index = page no - 1):
paging_predicate.page = 3
students_fourth = student_map.values(paging_predicate)
```

If you want to sort the result in a specific way before paging, you need to specify a custom comparator object that extends `hazelcast.core.Comparator`, which
provides an interface to a comparator object to compare two map entries in a distributed map. Also, this comparator class should extend one of `IdentifiedDataSerializable` or `Portable` to be Hazelcast-serializable.
After implementing this class in Python, you need to implement the Java equivalent of it and its factory. The Java equivalent of the comparator should implement `java.util.Comparator`. Note that the `compare` function of `Comparator` on the Java side is the equivalent of the `compare` function of `Comparator` on the Python side.
When you implement the Comparator and its factory, you can add them to the CLASSPATH of the server side.
See the [Adding User Library to CLASSPATH](#1212-adding-user-library-to-classpath) section.

If a comparator is not specified for `PagingPredicate`, but you want to get a collection of keys or values page by page, this collection must be
an instance of Java `Comparable` (i.e., it must implement `java.lang.Comparable`). Otherwise, the `java.lang.IllegalArgument` exception is thrown.
It should also be Python-comparable, that is its Python implementation should include the `__lt__()` method.

Also, you can access a specific page more easily by accessing the field `paging_predicate.page`. This way, if you make a query for page index 99, for example, it will get all 100 pages at once instead of reaching the 100th page one by one using the `next_page()` function.
See the code sample under `examples.map.map_paging_predicate_example` for more detail.

## 7.8. Performance

### 7.8.1. Near Cache
Expand Down
26 changes: 26 additions & 0 deletions examples/map/map_paging_predicate_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import hazelcast

from hazelcast.serialization.predicate import is_greater_than_or_equal_to, PagingPredicate

if __name__ == "__main__":
client = hazelcast.HazelcastClient()

student_grades_map = client.get_map('student grades').blocking()
student_grades_map.put_all({"student" + str(i): 100-i for i in range(10)})

greater_equal_predicate = is_greater_than_or_equal_to('this', 94) # Query for student grades that are >= 94.
paging_predicate = PagingPredicate(greater_equal_predicate, 2) # Page size 2.

# Retrieve first page:
grades_first_page = student_grades_map.values(paging_predicate) # [94, 95]

# ...
# Set up next page:
paging_predicate.next_page()

# Retrieve next page:
grades_second_page = student_grades_map.values(paging_predicate) # [96, 97]

# Set page to fourth page and retrieve (page index = page no - 1):
paging_predicate.page = 3
grades_fourth_page = student_grades_map.values(paging_predicate) # [100]
23 changes: 23 additions & 0 deletions hazelcast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,26 @@ def loads(self):
:return: (object), Python object represented by the original string
"""
return json.loads(self._json_string)


class Comparator(object):
"""
Comparator provides an interface to a comparator object to compare two map entries in a distributed map.
The custom comparator class must also extend either Portable or IdentifiedDataSerializable to be
Hazelcast-serializable.
A comparator class with the same functionality should be registered on Hazelcast server in order to be used
in PagingPredicate.
"""
def compare(self, entry1, entry2):
"""
This method is used to determine order of entries when sorting.
- If return value is a negative value, [entry1] comes after [entry2],
- If return value is a positive value, [entry1] comes before [entry2],
- If return value is 0, [entry1] and [entry2] are indistinguishable in this sorting mechanism.
Their order with respect to each other is undefined.
This method must always return the same result given the same pair of entries.
:param entry1: (K,V pair), first entry
:param entry2: (K,V pair), second entry
:return: (int), order index
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
response_size = client_message.read_int()
response = []
for _ in range(0, response_size):
response_item = (client_message.read_data(), client_message.read_data())
response_item = (to_object(client_message.read_data()), to_object(client_message.read_data()))
response.append(response_item)
parameters['response'] = ImmutableLazyDataList(response, to_object)
parameters['response'] = response
return parameters
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
response_size = client_message.read_int()
response = []
for _ in range(0, response_size):
response_item = client_message.read_data()
response_item = (to_object(client_message.read_data()), None)
response.append(response_item)
parameters['response'] = ImmutableLazyDataList(response, to_object)
parameters['response'] = response
return parameters
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def decode_response(client_message, to_object=None):
response_size = client_message.read_int()
response = []
for _ in range(0, response_size):
response_item = (client_message.read_data(), client_message.read_data())
response_item = (to_object(client_message.read_data()), to_object(client_message.read_data()))
response.append(response_item)
parameters['response'] = ImmutableLazyDataList(response, to_object)
parameters['response'] = response
return parameters
37 changes: 29 additions & 8 deletions hazelcast/proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
map_remove_entry_listener_codec, map_replace_codec, map_replace_if_same_codec, map_set_codec, map_try_lock_codec, \
map_try_put_codec, map_try_remove_codec, map_unlock_codec, map_values_codec, map_values_with_predicate_codec, \
map_add_interceptor_codec, map_execute_on_all_keys_codec, map_execute_on_key_codec, map_execute_on_keys_codec, \
map_execute_with_predicate_codec, map_add_near_cache_entry_listener_codec
map_execute_with_predicate_codec, map_add_near_cache_entry_listener_codec, map_entries_with_paging_predicate_codec,\
map_key_set_with_paging_predicate_codec, map_values_with_paging_predicate_codec
from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType, get_entry_listener_flags, MAX_SIZE
from hazelcast.util import check_not_none, thread_id, to_millis
from hazelcast.util import check_not_none, thread_id, to_millis, get_sorted_query_result_set, ITERATION_TYPE
from hazelcast import six
from hazelcast.serialization.predicate import PagingPredicate


class Map(Proxy):
Expand Down Expand Up @@ -226,8 +228,15 @@ def entry_set(self, predicate=None):
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
"""
if predicate:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_entries_with_predicate_codec, predicate=predicate_data)
if isinstance(predicate, PagingPredicate):
predicate.iteration_type = ITERATION_TYPE.ENTRY
predicate_data = self._to_data(predicate)
result_list_future = self._encode_invoke(map_entries_with_paging_predicate_codec, predicate=predicate_data)
return result_list_future.continue_with(get_sorted_query_result_set, predicate)

else:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_entries_with_predicate_codec, predicate=predicate_data)
else:
return self._encode_invoke(map_entry_set_codec)

Expand Down Expand Up @@ -441,8 +450,14 @@ def key_set(self, predicate=None):
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
"""
if predicate:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_key_set_with_predicate_codec, predicate=predicate_data)
if isinstance(predicate, PagingPredicate):
predicate.iteration_type = ITERATION_TYPE.KEY
predicate_data = self._to_data(predicate)
result_list_future = self._encode_invoke(map_key_set_with_paging_predicate_codec, predicate=predicate_data)
return result_list_future.continue_with(get_sorted_query_result_set, predicate)
else:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_key_set_with_predicate_codec, predicate=predicate_data)
else:
return self._encode_invoke(map_key_set_codec)

Expand Down Expand Up @@ -819,8 +834,14 @@ def values(self, predicate=None):
.. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates.
"""
if predicate:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_values_with_predicate_codec, predicate=predicate_data)
if isinstance(predicate, PagingPredicate):
predicate.iteration_type = ITERATION_TYPE.VALUE
predicate_data = self._to_data(predicate)
result_list_future = self._encode_invoke(map_values_with_paging_predicate_codec, predicate=predicate_data)
return result_list_future.continue_with(get_sorted_query_result_set, predicate)
else:
predicate_data = self._to_data(predicate)
return self._encode_invoke(map_values_with_predicate_codec, predicate=predicate_data)
else:
return self._encode_invoke(map_values_codec)

Expand Down
122 changes: 122 additions & 0 deletions hazelcast/serialization/predicate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from hazelcast.serialization.api import IdentifiedDataSerializable
from hazelcast.util import ITERATION_TYPE

PREDICATE_FACTORY_ID = -32

Expand Down Expand Up @@ -204,6 +205,123 @@ def write_data(self, object_data_output):
def __repr__(self):
return "TruePredicate()"


class PagingPredicate(Predicate):
CLASS_ID = 15

__NULL_ANCHOR = (-1, None)

def __init__(self, predicate, page_size, comparator=None):
"""
Creates a Paging predicate with provided page size, internal predicate, and optional comparator.
:param predicate: predicate to filter the results
:param page_size: page size of each result set
:param comparator: (Optional) a comparator object used to sort the results.
Defines compare method on (K,V) tuples. Must be an implementation of hazelcast.core.Comparator
WARNING: comparator must extend Comparator, and either IdentifiedDataSerializable or Portable.
"""
if isinstance(predicate, PagingPredicate):
raise TypeError('Nested paging predicate not supported.')
self._internal_predicate = predicate
self._comparator = comparator
if page_size <= 0:
raise ValueError('page_size should be greater than 0.')
self._page_size = page_size
self._page = 0 # initialized to be on first page
self.iteration_type = ITERATION_TYPE.ENTRY # ENTRY as default.
self.anchor_list = [] # List of pairs: (nearest page, (anchor key, anchor value))

def __repr__(self):
return "PagingPredicate(predicate=%s, page_size=%s, comparator=%s)" % (self._internal_predicate,
self.page_size, self.comparator)

def write_data(self, output):
output.write_object(self._internal_predicate)
output.write_object(self.comparator)
output.write_int(self.page)
output.write_int(self._page_size)
output.write_utf(ITERATION_TYPE.reverse.get(self.iteration_type, None))
output.write_int(len(self.anchor_list))
for nearest_page, (anchor_key, anchor_value) in self.anchor_list:
output.write_int(nearest_page)
output.write_object(anchor_key)
output.write_object(anchor_value)

def next_page(self):
"""
Sets page index to next page. If new index is out of range, the query results that this paging predicate will
retrieve will be an empty list.
:return (int) current page index
"""
self.page += 1
return self.page

def previous_page(self):
"""
If current page index is 0, this method does nothing. Otherwise, it sets page index to previous page.
:return (int) current page index
"""
if self.page != 0:
self.page -= 1
return self.page

def set_anchor(self, nearest_page, anchor):
anchor_entry = (nearest_page, anchor)
anchor_count = len(self.anchor_list)
if nearest_page < anchor_count:
self.anchor_list[nearest_page] = anchor_entry
elif nearest_page == anchor_count:
self.anchor_list.append(anchor_entry)
else:
raise IndexError('Anchor index is not correct, expected: ' + str(self.page) + 'found: ' + str(anchor_count))

def reset(self):
self.iteration_type = ITERATION_TYPE.ENTRY
self.anchor_list.clear()
self.page = 0

def get_nearest_anchor_entry(self):
"""
After each query, an anchor entry is set for that page.
For the next query user may set an arbitrary page.
For example: user queried first 5 pages which means first 5 anchor is available
if the next query is for the 10th page then the nearest anchor belongs to page 5
but if the next query is for the 3nd page then the nearest anchor belongs to page 2.
:return nearest anchored entry for current page
"""
anchor_count = len(self.anchor_list)
if self.page == 0 or anchor_count == 0:
return PagingPredicate.__NULL_ANCHOR
return self.anchor_list[self.page - 1] if self.page < anchor_count else self.anchor_list[anchor_count - 1]

@property
def page(self):
return self._page

@page.setter
def page(self, page_no):
"""
Sets page index to specified page_no.
If page_no is out of range, the query results that this paging predicate will retrieve will be an empty list.
:param page_no: (int) greater than or equal to 0.
"""
if page_no < 0:
raise ValueError('page_no should be positive or 0.')
self._page = page_no

@property
def page_size(self):
return self._page_size

@property
def internal_predicate(self):
return self._internal_predicate

@property
def comparator(self):
return self._comparator


sql = SqlPredicate
is_equal_to = EqualPredicate
is_not_equal_to = NotEqualPredicate
Expand Down Expand Up @@ -236,3 +354,7 @@ def is_less_than(attribute, x):
def is_less_than_or_equal_to(attribute, x):
return GreaterLessPredicate(attribute, x, True, True)


def paging_predicate(predicate, page_size):
return PagingPredicate(predicate, page_size)

2 changes: 2 additions & 0 deletions hazelcast/serialization/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def read(self, inp):
return None

# "write(self, out, obj)" is never called so not implemented here
def write(self, out, obj):
pass

def get_type_id(self):
return CONSTANT_TYPE_NULL
Expand Down
Loading