Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
* [7.4.11. Using Flake ID Generator](#7411-using-flake-id-generator)
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
* [7.4.12.1. Using Atomic Long](#74121-using-atomic-long)
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
* [7.4.12.2. Using AtomicReference](#74122-using-atomicreference)
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
Expand Down Expand Up @@ -1579,7 +1580,7 @@ Hence, callers should cache returned proxy objects. Second, if you call `Distrib
that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed.
For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.

#### 7.4.12.1. Using Atomic Long
#### 7.4.12.1. Using AtomicLong

Hazelcast `AtomicLong` is the distributed implementation of atomic 64-bit integer counter.
It offers various atomic operations such as `get`, `set`, `get_and_set`, `compare_and_set` and `increment_and_get`.
Expand Down Expand Up @@ -1608,6 +1609,42 @@ print ('CAS operation result:', result)
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.

#### 7.4.12.2. Using AtomicReference

Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
It provides a set of atomic operations allowing to modify the value behind the reference.
This data structure is a part of CP Subsystem.

A basic AtomicReference usage example is shown below.

```python
# Get a AtomicReference called "my-ref"
my_ref = client.cp_subsystem.get_atomic_reference("my-ref").blocking()
# Set the value atomically
my_ref.set(42)
# Read the value
value = my_ref.get()
print("Value:", value)
# Prints:
# Value: 42

# Try to replace the value with "value"
# with a compare-and-set atomic operation
result = my_ref.compare_and_set(42, "value")
print("CAS result:", result)
# Prints:
# CAS result: True
```

The following are some considerations you need to know when you use AtomicReference:

* AtomicReference works based on the byte-content and not on the object-reference. If you use the `compare_and_set()` method, do not change to the original value because its serialized content will then be different.
* All methods returning an object return a private copy. You can modify the private copy, but the rest of the world is shielded from your changes. If you want these changes to be visible to the rest of the world, you need to write the change back to the AtomicReference; but be careful about introducing a data-race.
* The in-memory format of an AtomicReference is `binary`. The receiving side does not need to have the class definition available unless it needs to be deserialized on the other side., e.g., because a method like `alter()` is executed. This deserialization is done for every call that needs to have the object instead of the binary content, so be careful with expensive object graphs that need to be deserialized.
* If you have an object with many fields or an object graph and you only need to calculate some information or need a subset of fields, you can use the `apply()` method. With the `apply()` method, the whole object does not need to be sent over the line; only the information that is relevant is sent.

AtomicReference does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.


## 7.5. Distributed Events

Expand Down
17 changes: 17 additions & 0 deletions examples/cp/atomic_reference_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import hazelcast

client = hazelcast.HazelcastClient()

my_ref = client.cp_subsystem.get_atomic_reference("my-ref").blocking()
my_ref.set(42)

value = my_ref.get()
print("Value:", value)

result = my_ref.compare_and_set(42, "value")
print("CAS result:", result)

final_value = my_ref.get()
print("Final value:", final_value)

client.shutdown()
24 changes: 24 additions & 0 deletions hazelcast/cp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from hazelcast.invocation import Invocation
from hazelcast.protocol.codec import cp_group_create_cp_group_codec
from hazelcast.proxy.cp.atomic_long import AtomicLong
from hazelcast.proxy.cp.atomic_reference import AtomicReference
from hazelcast.util import check_true


Expand Down Expand Up @@ -53,6 +54,26 @@ def get_atomic_long(self, name):
"""
return self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name)

def get_atomic_reference(self, name):
"""Returns the distributed AtomicReference instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the ``name`` argument, then the
AtomicLong instance will be created on the DEFAULT CP group.
If a group name is given, like ``.get_atomic_reference("myRef@group1")``,
the given group will be initialized first, if not initialized
already, and then the instance will be created on this group.

Args:
name (str): Name of the AtomicReference.

Returns:
hazelcast.proxy.cp.atomic_reference.AtomicReference: The AtomicReference
proxy for the given name.
"""
return self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name)


_DEFAULT_GROUP_NAME = "default"

Expand Down Expand Up @@ -83,6 +104,7 @@ def _get_object_name_for_proxy(name):


ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService"
ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService"


class CPProxyManager(object):
Expand All @@ -96,6 +118,8 @@ def get_or_create(self, service_name, proxy_name):
group_id = self._get_group_id(proxy_name)
if service_name == ATOMIC_LONG_SERVICE:
return AtomicLong(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == ATOMIC_REFERENCE_SERVICE:
return AtomicReference(self._context, group_id, service_name, proxy_name, object_name)

def _get_group_id(self, proxy_name):
codec = cp_group_create_cp_group_codec
Expand Down
23 changes: 20 additions & 3 deletions hazelcast/proxy/cp/atomic_long.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
atomic_long_get_codec, atomic_long_get_and_add_codec, atomic_long_get_and_set_codec, atomic_long_alter_codec, \
atomic_long_apply_codec
from hazelcast.proxy.cp import BaseCPProxy
from hazelcast.util import check_not_none, check_is_int


class AtomicLong(BaseCPProxy):
Expand Down Expand Up @@ -31,6 +32,7 @@ def add_and_get(self, delta):
hazelcast.future.Future[int]: The updated value, the given value added
to the current value
"""
check_is_int(delta)
codec = atomic_long_add_and_get_codec
request = codec.encode_request(self._group_id, self._object_name, delta)
return self._invoke(request, codec.decode_response)
Expand All @@ -47,6 +49,8 @@ def compare_and_set(self, expect, update):
hazelcast.future.Future[bool]: ``True`` if successful; or ``False`` if
the actual value was not equal to the expected value.
"""
check_is_int(expect)
check_is_int(update)
codec = atomic_long_compare_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, expect, update)
return self._invoke(request, codec.decode_response)
Expand Down Expand Up @@ -87,6 +91,7 @@ def get_and_add(self, delta):
Returns:
hazelcast.future.Future[int]: The old value before the add.
"""
check_is_int(delta)
codec = atomic_long_get_and_add_codec
request = codec.encode_request(self._group_id, self._object_name, delta)
return self._invoke(request, codec.decode_response)
Expand All @@ -100,6 +105,7 @@ def get_and_set(self, new_value):
Returns:
hazelcast.future.Future[int]: The old value.
"""
check_is_int(new_value)
codec = atomic_long_get_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, new_value)
return self._invoke(request, codec.decode_response)
Expand Down Expand Up @@ -130,6 +136,7 @@ def set(self, new_value):
Returns:
hazelcast.future.Future[None]:
"""
check_is_int(new_value)
codec = atomic_long_get_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, new_value)
return self._invoke(request)
Expand All @@ -145,13 +152,18 @@ def alter(self, function):

Args:
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
The function applied to the currently stored value.
The function that alters the currently stored value.

Returns:
hazelcast.future.Future[None]:
"""
check_not_none(function, "Function cannot be None")
function_data = self._to_data(function)
codec = atomic_long_alter_codec
# 1 means return the new value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice addition. I had to look at the codec and server-side code to understand what 1/0 means. 👍

# There is no way to tell server to return nothing as of now (30.09.2020)
# The new value is `long` (comes with the initial frame) and we
# don't try to decode it. So, this shouldn't cause any problems.
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
return self._invoke(request)

Expand All @@ -166,13 +178,15 @@ def alter_and_get(self, function):

Args:
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
The function applied to the currently stored value.
The function that alters the currently stored value.

Returns:
hazelcast.future.Future[int]: The new value.
"""
check_not_none(function, "Function cannot be None")
function_data = self._to_data(function)
codec = atomic_long_alter_codec
# 1 means return the new value.
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
return self._invoke(request, codec.decode_response)

Expand All @@ -187,13 +201,15 @@ def get_and_alter(self, function):

Args:
function (hazelcast.serialization.api.Portable or hazelcast.serialization.api.IdentifiedDataSerializable):
The function applied to the currently stored value.
The function that alters the currently stored value.

Returns:
hazelcast.future.Future[int]: The old value.
"""
check_not_none(function, "Function cannot be None")
function_data = self._to_data(function)
codec = atomic_long_alter_codec
# 0 means return the old value.
request = codec.encode_request(self._group_id, self._object_name, function_data, 0)
return self._invoke(request, codec.decode_response)

Expand All @@ -213,6 +229,7 @@ def apply(self, function):
Returns:
hazelcast.future.Future[any]: The result of the function application.
"""
check_not_none(function, "Function cannot be None")
function_data = self._to_data(function)
codec = atomic_long_apply_codec
request = codec.encode_request(self._group_id, self._object_name, function_data)
Expand Down
Loading