Skip to content

Commit 53d086f

Browse files
committed
Implement AtomicReference
For the AtomicReference, implementation, code samples, documentation and tests are added. It also includes some minor docstring fixes regarding the lists in some places and some missing checks and documentation fixes for the AtomicLong.
1 parent dc00f73 commit 53d086f

File tree

12 files changed

+505
-27
lines changed

12 files changed

+505
-27
lines changed

README.md

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@
5656
* [7.4.11. Using Flake ID Generator](#7411-using-flake-id-generator)
5757
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
5858
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
59-
* [7.4.12.1. Using Atomic Long](#74121-using-atomic-long)
59+
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
60+
* [7.4.12.2. Using AtomicReference](#74122-using-atomicreference)
6061
* [7.5. Distributed Events](#75-distributed-events)
6162
* [7.5.1. Cluster Events](#751-cluster-events)
6263
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
@@ -1579,7 +1580,7 @@ Hence, callers should cache returned proxy objects. Second, if you call `Distrib
15791580
that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed.
15801581
For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.
15811582

1582-
#### 7.4.12.1. Using Atomic Long
1583+
#### 7.4.12.1. Using AtomicLong
15831584

15841585
Hazelcast `AtomicLong` is the distributed implementation of atomic 64-bit integer counter.
15851586
It offers various atomic operations such as `get`, `set`, `get_and_set`, `compare_and_set` and `increment_and_get`.
@@ -1608,6 +1609,42 @@ print ('CAS operation result:', result)
16081609
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
16091610
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.
16101611

1612+
#### 7.4.12.2. Using AtomicReference
1613+
1614+
Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
1615+
It provides a set of atomic operations allowing to modify the value behind the reference.
1616+
This data structure is a part of CP Subsystem.
1617+
1618+
A basic AtomicReference usage example is shown below.
1619+
1620+
```python
1621+
# Get a AtomicReference called "my-ref"
1622+
my_ref = client.cp_subsystem.get_atomic_reference("my-ref").blocking()
1623+
# Set the value atomically
1624+
my_ref.set(42)
1625+
# Read the value
1626+
value = my_ref.get()
1627+
print("Value:", value)
1628+
# Prints:
1629+
# Value: 42
1630+
1631+
# Try to replace the value with "value"
1632+
# with a compare-and-set atomic operation
1633+
result = my_ref.compare_and_set(42, "value")
1634+
print("CAS result:", result)
1635+
# Prints:
1636+
# CAS result: True
1637+
```
1638+
1639+
The following are some considerations you need to know when you use AtomicReference:
1640+
1641+
* AtomicReference works based on the byte-content and not on the object-reference. If you use the `compare_and_set()` method, do not change to the original value because its serialized content will then be different.
1642+
* All methods returning an object return a private copy. You can modify the private copy, but the rest of the world is shielded from your changes. If you want these changes to be visible to the rest of the world, you need to write the change back to the AtomicReference; but be careful about introducing a data-race.
1643+
* The in-memory format of an AtomicReference is `binary`. The receiving side does not need to have the class definition available unless it needs to be deserialized on the other side., e.g., because a method like `alter()` is executed. This deserialization is done for every call that needs to have the object instead of the binary content, so be careful with expensive object graphs that need to be deserialized.
1644+
* If you have an object with many fields or an object graph and you only need to calculate some information or need a subset of fields, you can use the `apply()` method. With the `apply()` method, the whole object does not need to be sent over the line; only the information that is relevant is sent.
1645+
1646+
AtomicReference does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.
1647+
16111648

16121649
## 7.5. Distributed Events
16131650

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import hazelcast
2+
3+
client = hazelcast.HazelcastClient()
4+
5+
my_ref = client.cp_subsystem.get_atomic_reference("my-ref").blocking()
6+
my_ref.set(42)
7+
8+
value = my_ref.get()
9+
print("Value:", value)
10+
11+
result = my_ref.compare_and_set(42, "value")
12+
print("CAS result:", result)
13+
14+
final_value = my_ref.get()
15+
print("Final value:", final_value)
16+
17+
client.shutdown()

hazelcast/cp.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from hazelcast.invocation import Invocation
22
from hazelcast.protocol.codec import cp_group_create_cp_group_codec
33
from hazelcast.proxy.cp.atomic_long import AtomicLong
4+
from hazelcast.proxy.cp.atomic_reference import AtomicReference
45
from hazelcast.util import check_true
56

67

@@ -53,6 +54,26 @@ def get_atomic_long(self, name):
5354
"""
5455
return self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name)
5556

57+
def get_atomic_reference(self, name):
58+
"""Returns the distributed AtomicReference instance with given name.
59+
60+
The instance is created on CP Subsystem.
61+
62+
If no group name is given within the ``name`` argument, then the
63+
AtomicLong instance will be created on the DEFAULT CP group.
64+
If a group name is given, like ``.get_atomic_reference("myRef@group1")``,
65+
the given group will be initialized first, if not initialized
66+
already, and then the instance will be created on this group.
67+
68+
Args:
69+
name (str): Name of the AtomicReference.
70+
71+
Returns:
72+
hazelcast.proxy.cp.atomic_reference.AtomicReference: The AtomicReference
73+
proxy for the given name.
74+
"""
75+
return self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name)
76+
5677

5778
_DEFAULT_GROUP_NAME = "default"
5879

@@ -83,6 +104,7 @@ def _get_object_name_for_proxy(name):
83104

84105

85106
ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService"
107+
ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService"
86108

87109

88110
class CPProxyManager(object):
@@ -96,6 +118,8 @@ def get_or_create(self, service_name, proxy_name):
96118
group_id = self._get_group_id(proxy_name)
97119
if service_name == ATOMIC_LONG_SERVICE:
98120
return AtomicLong(self._context, group_id, service_name, proxy_name, object_name)
121+
elif service_name == ATOMIC_REFERENCE_SERVICE:
122+
return AtomicReference(self._context, group_id, service_name, proxy_name, object_name)
99123

100124
def _get_group_id(self, proxy_name):
101125
codec = cp_group_create_cp_group_codec

hazelcast/proxy/cp/atomic_long.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
atomic_long_get_codec, atomic_long_get_and_add_codec, atomic_long_get_and_set_codec, atomic_long_alter_codec, \
33
atomic_long_apply_codec
44
from hazelcast.proxy.cp import BaseCPProxy
5+
from hazelcast.util import check_not_none
56

67

78
class AtomicLong(BaseCPProxy):
@@ -145,13 +146,18 @@ def alter(self, function):
145146
146147
Args:
147148
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
148-
The function applied to the currently stored value.
149+
The function that alters the currently stored value.
149150
150151
Returns:
151152
hazelcast.future.Future[None]:
152153
"""
154+
check_not_none(function, "Function cannot be None")
153155
function_data = self._to_data(function)
154156
codec = atomic_long_alter_codec
157+
# 1 means return the new value.
158+
# There is no way to tell server to return nothing as of now (30.09.2020)
159+
# The new value is `long` (comes with the initial frame) and we
160+
# don't try to decode it. So, this shouldn't cause any problems.
155161
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
156162
return self._invoke(request)
157163

@@ -166,13 +172,14 @@ def alter_and_get(self, function):
166172
167173
Args:
168174
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
169-
The function applied to the currently stored value.
175+
The function that alters the currently stored value.
170176
171177
Returns:
172178
hazelcast.future.Future[int]: The new value.
173179
"""
174180
function_data = self._to_data(function)
175181
codec = atomic_long_alter_codec
182+
# 1 means return the new value.
176183
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
177184
return self._invoke(request, codec.decode_response)
178185

@@ -187,13 +194,15 @@ def get_and_alter(self, function):
187194
188195
Args:
189196
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
190-
The function applied to the currently stored value.
197+
The function that alters the currently stored value.
191198
192199
Returns:
193200
hazelcast.future.Future[int]: The old value.
194201
"""
202+
check_not_none(function, "Function cannot be None")
195203
function_data = self._to_data(function)
196204
codec = atomic_long_alter_codec
205+
# 0 means return the old value.
197206
request = codec.encode_request(self._group_id, self._object_name, function_data, 0)
198207
return self._invoke(request, codec.decode_response)
199208

@@ -213,6 +222,7 @@ def apply(self, function):
213222
Returns:
214223
hazelcast.future.Future[any]: The result of the function application.
215224
"""
225+
check_not_none(function, "Function cannot be None")
216226
function_data = self._to_data(function)
217227
codec = atomic_long_apply_codec
218228
request = codec.encode_request(self._group_id, self._object_name, function_data)

0 commit comments

Comments
 (0)