Skip to content

Commit f05112e

Browse files
authored
Projections added to python client (#451)
* Projections added to python client * Fixed projections example * Identity unit tests, fixed changes requested on pr * Fixed print statements in doc example * Fix again print statements in python docs * Fixed requested changes * Python 2 test fix for HazelcastJsonValue
1 parent c6149ab commit f05112e

File tree

10 files changed

+338
-1
lines changed

10 files changed

+338
-1
lines changed

docs/using_python_client_with_hazelcast_imdg.rst

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,62 @@ See the following example.
21372137
# Average age is 30
21382138
print("Average age is %f" % average_age)
21392139
2140+
Projections
2141+
~~~~~~~~~~~
2142+
2143+
There are cases where instead of sending all the data returned by a query
2144+
from the server, you want to transform (strip down) each result object in order
2145+
to avoid redundant network traffic.
2146+
2147+
For example, you select all employees based on some criteria, but you just
2148+
want to return their name instead of the whole object. It is easily doable
2149+
with the Projections.
2150+
2151+
The ``projection`` module provides three projection functions:
2152+
2153+
- ``single_attribute``: Extracts a single attribute from an object and returns
2154+
it.
2155+
- ``multi_attribute``: Extracts multiple attributes from an object and returns
2156+
them as a ``list``.
2157+
- ``identity``: Returns the object as it is.
2158+
2159+
These projections are used with the ``map.project`` function, which takes an
2160+
optional predicate argument.
2161+
2162+
See the following example.
2163+
2164+
.. code:: python
2165+
2166+
import hazelcast
2167+
2168+
from hazelcast.core import HazelcastJsonValue
2169+
from hazelcast.predicate import greater_or_equal
2170+
from hazelcast.projection import single_attribute, multi_attribute
2171+
2172+
client = hazelcast.HazelcastClient()
2173+
employees = client.get_map("employees").blocking()
2174+
2175+
employees.put(1, HazelcastJsonValue('{"Age": 23, "Height": 180, "Weight": 60}'))
2176+
employees.put(2, HazelcastJsonValue('{"Age": 21, "Height": 170, "Weight": 70}'))
2177+
2178+
employee_ages = employees.project(single_attribute("Age"))
2179+
# Prints:
2180+
# The ages of employees are [21, 23]
2181+
print("The ages of employees are %s" % employee_ages)
2182+
2183+
# Run Single Attribute With Predicate
2184+
employee_ages = employees.project(single_attribute("Age"), greater_or_equal("Age", 23))
2185+
# Prints:
2186+
# The employee age is 23
2187+
print("The employee age is: %s" % employee_ages[0])
2188+
2189+
# Run Multi Attribute Projection
2190+
employee_multi_attribute = employees.project(multi_attribute("Age", "Height"))
2191+
# Prints:
2192+
# Employee 1 age and height: [21, 170] Employee 2 age and height: [23, 180]
2193+
print("Employee 1 age and height: %s Employee 2 age and height: %s" % (employee_multi_attribute[0], employee_multi_attribute[1]))
2194+
2195+
21402196
Performance
21412197
-----------
21422198

hazelcast/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,12 @@ def __eq__(self, other):
355355
def __ne__(self, other):
356356
return not self.__eq__(other)
357357

358+
def __hash__(self):
359+
return hash(self._json_string)
360+
361+
def __repr__(self):
362+
return self._json_string
363+
358364

359365
class MemberVersion(object):
360366
"""

hazelcast/projection.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
from hazelcast.serialization.api import IdentifiedDataSerializable
2+
3+
_PROJECTIONS_FACTORY_ID = -30
4+
5+
6+
class Projection(object):
7+
"""Marker base class for all projections.
8+
9+
Projections allow the client to transform (strip down) each query result
10+
object in order to avoid redundant network traffic.
11+
"""
12+
13+
pass
14+
15+
16+
class _AbstractProjection(Projection, IdentifiedDataSerializable):
17+
def write_data(self, object_data_output):
18+
raise NotImplementedError("write_data")
19+
20+
def read_data(self, object_data_input):
21+
pass
22+
23+
def get_factory_id(self):
24+
return _PROJECTIONS_FACTORY_ID
25+
26+
def get_class_id(self):
27+
raise NotImplementedError("get_class_id")
28+
29+
30+
def _validate_attribute_path(attribute_path):
31+
if not attribute_path:
32+
raise ValueError("attribute_path must not be None or empty")
33+
34+
if "[any]" in attribute_path:
35+
raise ValueError("attribute_path must not contain [any] operators")
36+
37+
38+
class _SingleAttributeProjection(_AbstractProjection):
39+
def __init__(self, attribute_path):
40+
_validate_attribute_path(attribute_path)
41+
self._attribute_path = attribute_path
42+
43+
def write_data(self, object_data_output):
44+
object_data_output.write_string(self._attribute_path)
45+
46+
def get_class_id(self):
47+
return 0
48+
49+
50+
class _MultiAttributeProjection(_AbstractProjection):
51+
def __init__(self, *attribute_paths):
52+
if not attribute_paths:
53+
raise ValueError("Specify at least one attribute path")
54+
55+
for attribute_path in attribute_paths:
56+
_validate_attribute_path(attribute_path)
57+
58+
self.attribute_paths = attribute_paths
59+
60+
def write_data(self, object_data_output):
61+
object_data_output.write_string_array(self.attribute_paths)
62+
63+
def get_class_id(self):
64+
return 1
65+
66+
67+
class _IdentityProjection(_AbstractProjection):
68+
def write_data(self, object_data_output):
69+
pass
70+
71+
def get_class_id(self):
72+
return 2
73+
74+
75+
def single_attribute(attribute_path):
76+
"""Creates a projection that extracts the value of
77+
the given attribute path.
78+
79+
Args:
80+
attribute_path (str): Path to extract the attribute from.
81+
82+
Returns:
83+
Projection[any]: A projection that extracts the value of the given
84+
attribute path.
85+
"""
86+
return _SingleAttributeProjection(attribute_path)
87+
88+
89+
def multi_attribute(*attribute_paths):
90+
"""Creates a projection that extracts the values of
91+
one or more attribute paths.
92+
93+
Args:
94+
*attribute_paths (str): Paths to extract the attributes from.
95+
96+
Returns:
97+
Projection[list]: A projection that extracts the values of the given
98+
attribute paths.
99+
"""
100+
return _MultiAttributeProjection(*attribute_paths)
101+
102+
103+
def identity():
104+
"""Creates a projection that does no transformation.
105+
106+
Returns:
107+
Projection[hazelcast.core.MapEntry]: A projection that does no
108+
transformation.
109+
"""
110+
return _IdentityProjection()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
2+
from hazelcast.protocol.builtin import StringCodec
3+
from hazelcast.protocol.builtin import DataCodec
4+
from hazelcast.protocol.builtin import ListMultiFrameCodec
5+
6+
# hex: 0x013B00
7+
_REQUEST_MESSAGE_TYPE = 80640
8+
# hex: 0x013B01
9+
_RESPONSE_MESSAGE_TYPE = 80641
10+
11+
_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE
12+
13+
14+
def encode_request(name, projection):
15+
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
16+
StringCodec.encode(buf, name)
17+
DataCodec.encode(buf, projection, True)
18+
return OutboundMessage(buf, True)
19+
20+
21+
def decode_response(msg):
22+
msg.next_frame()
23+
return ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
2+
from hazelcast.protocol.builtin import StringCodec
3+
from hazelcast.protocol.builtin import DataCodec
4+
from hazelcast.protocol.builtin import ListMultiFrameCodec
5+
6+
# hex: 0x013C00
7+
_REQUEST_MESSAGE_TYPE = 80896
8+
# hex: 0x013C01
9+
_RESPONSE_MESSAGE_TYPE = 80897
10+
11+
_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE
12+
13+
14+
def encode_request(name, projection, predicate):
15+
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
16+
StringCodec.encode(buf, name)
17+
DataCodec.encode(buf, projection)
18+
DataCodec.encode(buf, predicate, True)
19+
return OutboundMessage(buf, True)
20+
21+
22+
def decode_response(msg):
23+
msg.next_frame()
24+
return ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)

hazelcast/proxy/map.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
map_add_interceptor_codec,
5050
map_aggregate_codec,
5151
map_aggregate_with_predicate_codec,
52+
map_project_codec,
53+
map_project_with_predicate_codec,
5254
map_execute_on_all_keys_codec,
5355
map_execute_on_key_codec,
5456
map_execute_on_keys_codec,
@@ -345,6 +347,44 @@ def handler(message):
345347
request = map_aggregate_codec.encode_request(self.name, aggregator_data)
346348
return self._invoke(request, handler)
347349

350+
def project(self, projection, predicate=None):
351+
"""Applies the projection logic on map entries and filter the result with the
352+
predicate, if given.
353+
354+
Args:
355+
projection (hazelcast.projection.Projection): Projection to project the
356+
entries with.
357+
predicate (hazelcast.predicate.Predicate): Predicate to filter the entries
358+
with.
359+
360+
Returns:
361+
hazelcast.future.Future: The result of the projection.
362+
"""
363+
check_not_none(projection, "Projection can't be none")
364+
projection_data = self._to_data(projection)
365+
if predicate:
366+
if isinstance(predicate, PagingPredicate):
367+
raise AssertionError("Paging predicate is not supported.")
368+
369+
def handler(message):
370+
return ImmutableLazyDataList(
371+
map_project_with_predicate_codec.decode_response(message), self._to_object
372+
)
373+
374+
predicate_data = self._to_data(predicate)
375+
request = map_project_with_predicate_codec.encode_request(
376+
self.name, projection_data, predicate_data
377+
)
378+
return self._invoke(request, handler)
379+
380+
def handler(message):
381+
return ImmutableLazyDataList(
382+
map_project_codec.decode_response(message), self._to_object
383+
)
384+
385+
request = map_project_codec.encode_request(self.name, projection_data)
386+
return self._invoke(request, handler)
387+
348388
def clear(self):
349389
"""Clears the map.
350390

hazelcast/serialization/serialization_const.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
JAVA_DEFAULT_TYPE_CLASS = -24
3232
JAVA_DEFAULT_TYPE_DATE = -25
3333
JAVA_DEFAULT_TYPE_BIG_INTEGER = -26
34+
JAVA_DEFAULT_TYPE_ARRAY = -28
3435
JAVA_DEFAULT_TYPE_ARRAY_LIST = -29
3536
JAVA_DEFAULT_TYPE_LINKED_LIST = -30
3637
JAVASCRIPT_JSON_SERIALIZATION_TYPE = -130

hazelcast/serialization/serializer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,17 @@ def get_type_id(self):
305305
return JAVA_DEFAULT_TYPE_CLASS
306306

307307

308+
class ArraySerializer(BaseSerializer):
309+
def read(self, inp):
310+
size = inp.read_int()
311+
return [inp.read_object() for _ in range(size)]
312+
313+
# "write(self, out, obj)" is never called so not implemented here
314+
315+
def get_type_id(self):
316+
return JAVA_DEFAULT_TYPE_ARRAY
317+
318+
308319
class ArrayListSerializer(BaseSerializer):
309320
def read(self, inp):
310321
size = inp.read_int()

hazelcast/serialization/service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def _register_constant_serializers(self):
104104
self._registry.register_constant_serializer(DateTimeSerializer(), datetime)
105105
self._registry.register_constant_serializer(BigIntegerSerializer())
106106
self._registry.register_constant_serializer(JavaClassSerializer())
107+
self._registry.register_constant_serializer(ArraySerializer())
107108
self._registry.register_constant_serializer(ArrayListSerializer(), list)
108109
self._registry.register_constant_serializer(LinkedListSerializer())
109110
self._registry.register_constant_serializer(

0 commit comments

Comments
 (0)