Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
* [7.4.10. Using PN Counter](#7410-using-pn-counter)
* [7.4.11. Using Flake ID Generator](#7411-using-flake-id-generator)
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
* [7.4.12.1. Using Atomic Long](#74121-using-atomic-long)
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
Expand Down Expand Up @@ -1559,6 +1561,54 @@ The following are the descriptions of configuration elements and attributes:
* `prefetch_count`: Count of IDs which are pre-fetched on the background when one call to `generator.newId()` is made. Its value must be in the range `1` - `100,000`. Its default value is `100`.
* `prefetch_validity`: Specifies for how long the pre-fetched IDs can be used. After this time elapses, a new batch of IDs are fetched. Time unit is seconds. Its default value is `600` seconds (`10` minutes). The IDs contain a timestamp component, which ensures a rough global ordering of them. If an ID is assigned to an object that was created later, it will be out of order. If ordering is not important, set this value to `0`.

### 7.4.12. CP Subsystem

Hazelcast IMDG 4.0 introduces CP concurrency primitives with respect to the [CAP principle](http://awoc.wolski.fi/dlib/big-data/Brewer_podc_keynote_2000.pdf),
i.e., they always maintain [linearizability](https://aphyr.com/posts/313-strong-consistency-models) and prefer consistency to
availability during network partitions and client or server failures.

All data structures within CP Subsystem are available through `client.cp_subsystem` component of the client.

Before using Atomic Long, Lock, and Semaphore, CP Subsystem has to be enabled on cluster-side.
Refer to [CP Subsystem](https://docs.hazelcast.org/docs/latest/manual/html-single/#cp-subsystem) documentation for more information.

Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently.
The CP data structures differ from the other Hazelcast data structures in two aspects.
First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface.
Hence, callers should cache returned proxy objects. Second, if you call `DistributedObject.destroy()` on a CP data structure proxy,
that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed.
For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.

#### 7.4.12.1. Using Atomic Long

Hazelcast `AtomicLong` is the distributed implementation of atomic 64-bit integer counter.
It offers various atomic operations such as `get`, `set`, `get_and_set`, `compare_and_set` and `increment_and_get`.
This data structure is a part of CP Subsystem.

An Atomic Long usage example is shown below.

```python
# Get an AtomicLong called "my-atomic-long"
atomic_long = client.cp_subsystem.get_atomic_long("my-atomic-long").blocking()
# Get current value
value = atomic_long.get()
print("Value:", value)
# Prints:
# Value: 0

# Increment by 42
atomic_long.add_and_get(42)
# Set to 0 atomically if the current value is 42
result = atomic_long.compare_and_set(42, 0)
print ('CAS operation result:', result)
# Prints:
# CAS operation result: True
```

AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.


## 7.5. Distributed Events

This chapter explains when various events are fired and describes how you can add event listeners on a Hazelcast Python client.
Expand Down
6 changes: 6 additions & 0 deletions docs/hazelcast.cp.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CP Subsystem
============

.. py:currentmodule:: hazelcast.cp

.. autoclass:: CPSubsystem
4 changes: 4 additions & 0 deletions docs/hazelcast.proxy.cp.atomic_long.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
AtomicLong
==========

.. automodule:: hazelcast.proxy.cp.atomic_long
5 changes: 5 additions & 0 deletions docs/hazelcast.proxy.cp.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CP Proxies
====================

.. toctree::
hazelcast.proxy.cp.atomic_long
1 change: 1 addition & 0 deletions docs/hazelcast.proxy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Hazelcast Proxies
.. toctree::

hazelcast.proxy.base
hazelcast.proxy.cp
hazelcast.proxy.executor
hazelcast.proxy.flake_id_generator
hazelcast.proxy.list
Expand Down
1 change: 1 addition & 0 deletions docs/hazelcast.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Hazelcast Python Client
hazelcast.cluster
hazelcast.config
hazelcast.core
hazelcast.cp
hazelcast.errors
hazelcast.future
hazelcast.lifecycle
Expand Down
11 changes: 11 additions & 0 deletions examples/cp/atomic_long_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import hazelcast

client = hazelcast.HazelcastClient()

view_counter = client.cp_subsystem.get_atomic_long("views").blocking()
value = view_counter.get()
print("Value:", value)
new_value = view_counter.add_and_get(42)
print("New value:", new_value)

client.shutdown()
20 changes: 9 additions & 11 deletions examples/org-website/atomic_long_sample.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# TODO Fix this when we add CP Atomic Long

import hazelcast

# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1
# Start the Hazelcast Client and connect to an already running
# Hazelcast Cluster on 127.0.0.1
# Note: CP Subsystem has to be enabled on the cluster
hz = hazelcast.HazelcastClient()
# Get an Atomic Counter, we'll call it "counter"
counter = hz.get_atomic_long("counter").blocking()
# Add and Get the "counter"
counter.add_and_get(3)
# value is 3
# Display the "counter" value
print("counter: {}".format(counter.get()))
# Shutdown this Hazelcast Client
# Get the AtomicLong counter from Cluster
counter = hz.cp_subsystem.get_atomic_long("counter").blocking()
# Add and get the counter
value = counter.add_and_get(3)
print("Counter value is", value)
# Shutdown this Hazelcast client
hz.shutdown()

2 changes: 2 additions & 0 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hazelcast.config import _Config
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
from hazelcast.core import DistributedObjectInfo, DistributedObjectEvent
from hazelcast.cp import CPSubsystem
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.listener import ListenerService, ClusterViewListenerService
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
Expand Down Expand Up @@ -329,6 +330,7 @@ def __init__(self, **kwargs):
self._invocation_service,
self._logger_extras)
self._proxy_manager = ProxyManager(self._context)
self.cp_subsystem = CPSubsystem(self._context)
self._transaction_manager = TransactionManager(self._context, self._logger_extras)
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(self, self._reactor, self._connection_manager,
Expand Down
106 changes: 106 additions & 0 deletions hazelcast/cp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from hazelcast.invocation import Invocation
from hazelcast.protocol.codec import cp_group_create_cp_group_codec
from hazelcast.proxy.cp.atomic_long import AtomicLong
from hazelcast.util import check_true


class CPSubsystem(object):
"""CP Subsystem is a component of Hazelcast that builds a strongly consistent
layer for a set of distributed data structures.

Its APIs can be used for implementing distributed coordination use cases,
such as leader election, distributed locking, synchronization, and metadata
management.

Its data structures are CP with respect to the CAP principle, i.e., they
always maintain linearizability and prefer consistency over availability
during network partitions. Besides network partitions, CP Subsystem
withstands server and client failures.

Data structures in CP Subsystem run in CP groups. Each CP group elects
its own Raft leader and runs the Raft consensus algorithm independently.

The CP data structures differ from the other Hazelcast data structures
in two aspects. First, an internal commit is performed on the METADATA CP
group every time you fetch a proxy from this interface. Hence, callers
should cache returned proxy objects. Second, if you call ``destroy()``
on a CP data structure proxy, that data structure is terminated on the
underlying CP group and cannot be reinitialized until the CP group is
force-destroyed. For this reason, please make sure that you are completely
done with a CP data structure before destroying its proxy.
"""

def __init__(self, context):
self._proxy_manager = CPProxyManager(context)

def get_atomic_long(self, name):
"""Returns the distributed AtomicLong instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the ``name`` argument, then the
AtomicLong instance will be created on the default CP group.
If a group name is given, like ``.get_atomic_long("myLong@group1")``,
the given group will be initialized first, if not initialized
already, and then the instance will be created on this group.

Args:
name (str): Name of the AtomicLong.

Returns:
hazelcast.proxy.cp.atomic_long.AtomicLong: The AtomicLong proxy
for the given name.
"""
return self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name)


_DEFAULT_GROUP_NAME = "default"


def _without_default_group_name(name):
name = name.strip()
idx = name.find("@")
if idx == -1:
return name

check_true(name.find("@", idx + 1) == -1, "Custom group name must be specified at most once")
group_name = name[idx + 1:].strip()
if group_name == _DEFAULT_GROUP_NAME:
return name[:idx]
return name


def _get_object_name_for_proxy(name):
idx = name.find("@")
if idx == -1:
return name

group_name = name[idx + 1:].strip()
check_true(len(group_name) > 0, "Custom CP group name cannot be empty string")
object_name = name[:idx].strip()
check_true(len(object_name) > 0, "Object name cannot be empty string")
return object_name


ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService"


class CPProxyManager(object):
def __init__(self, context):
self._context = context

def get_or_create(self, service_name, proxy_name):
proxy_name = _without_default_group_name(proxy_name)
object_name = _get_object_name_for_proxy(proxy_name)

group_id = self._get_group_id(proxy_name)
if service_name == ATOMIC_LONG_SERVICE:
return AtomicLong(self._context, group_id, service_name, proxy_name, object_name)

def _get_group_id(self, proxy_name):
codec = cp_group_create_cp_group_codec
request = codec.encode_request(proxy_name)
invocation = Invocation(request, response_handler=codec.decode_response)
invocation_service = self._context.invocation_service
invocation_service.invoke(invocation)
return invocation.future.result()
27 changes: 24 additions & 3 deletions hazelcast/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ def continue_with(self, continuation_func, *args):
"""Create a continuation that executes when the Future is completed.

Args:
continuation_func (function): A function which takes the future as the only parameter.
continuation_func (function): A function which takes the Future as the only parameter.
Return value of the function will be set as the result of the continuation future.
If the return value of the function is another Future, it will be chained
to the returned Future.
*args: Arguments to be passed into ``continuation_function``.

Returns:
Expand All @@ -140,13 +142,32 @@ def continue_with(self, continuation_func, *args):

def callback(f):
try:
future.set_result(continuation_func(f, *args))
result = continuation_func(f, *args)
if isinstance(result, Future):
future._chain(result)
else:
future.set_result(result)
except:
future.set_exception(sys.exc_info()[1], sys.exc_info()[2])
exception, traceback = sys.exc_info()[1:]
future.set_exception(exception, traceback)

self.add_done_callback(callback)
return future

def _chain(self, chained_future):
def callback(f):
try:
result = f.result()
if isinstance(result, Future):
self._chain(result)
else:
self.set_result(result)
except:
exception, traceback = sys.exc_info()[1:]
self.set_exception(exception, traceback)

chained_future.add_done_callback(callback)


class _Event(object):
_flag = False
Expand Down
23 changes: 22 additions & 1 deletion hazelcast/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,26 @@ def __ne__(self, other):
class EndpointQualifier(object):
__slots__ = ()

def __init__(self, type, identifier):
def __init__(self, _, __):
pass


class RaftGroupId(object):
__slots__ = ("name", "seed", "id")

def __init__(self, name, seed, group_id):
self.name = name
self.seed = seed
self.id = group_id

def __eq__(self, other):
return isinstance(other, RaftGroupId) \
and self.name == other.name \
and self.seed == other.seed \
and self.id == other.id

def __ne__(self, other):
return not self.__eq__(other)

def __hash__(self):
return hash((self.name, self.seed, self.id))
27 changes: 27 additions & 0 deletions hazelcast/protocol/codec/atomic_long_add_and_get_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from hazelcast.serialization.bits import *
from hazelcast.protocol.builtin import FixSizedTypesCodec
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer, RESPONSE_HEADER_SIZE
from hazelcast.protocol.codec.custom.raft_group_id_codec import RaftGroupIdCodec
from hazelcast.protocol.builtin import StringCodec

# hex: 0x090300
_REQUEST_MESSAGE_TYPE = 590592
# hex: 0x090301
_RESPONSE_MESSAGE_TYPE = 590593

_REQUEST_DELTA_OFFSET = REQUEST_HEADER_SIZE
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_DELTA_OFFSET + LONG_SIZE_IN_BYTES
_RESPONSE_RESPONSE_OFFSET = RESPONSE_HEADER_SIZE


def encode_request(group_id, name, delta):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
FixSizedTypesCodec.encode_long(buf, _REQUEST_DELTA_OFFSET, delta)
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name, True)
return OutboundMessage(buf, False)


def decode_response(msg):
initial_frame = msg.next_frame()
return FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_RESPONSE_OFFSET)
29 changes: 29 additions & 0 deletions hazelcast/protocol/codec/atomic_long_alter_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from hazelcast.serialization.bits import *
from hazelcast.protocol.builtin import FixSizedTypesCodec
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer, RESPONSE_HEADER_SIZE
from hazelcast.protocol.codec.custom.raft_group_id_codec import RaftGroupIdCodec
from hazelcast.protocol.builtin import StringCodec
from hazelcast.protocol.builtin import DataCodec

# hex: 0x090200
_REQUEST_MESSAGE_TYPE = 590336
# hex: 0x090201
_RESPONSE_MESSAGE_TYPE = 590337

_REQUEST_RETURN_VALUE_TYPE_OFFSET = REQUEST_HEADER_SIZE
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_RETURN_VALUE_TYPE_OFFSET + INT_SIZE_IN_BYTES
_RESPONSE_RESPONSE_OFFSET = RESPONSE_HEADER_SIZE


def encode_request(group_id, name, function, return_value_type):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
FixSizedTypesCodec.encode_int(buf, _REQUEST_RETURN_VALUE_TYPE_OFFSET, return_value_type)
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
DataCodec.encode(buf, function, True)
return OutboundMessage(buf, False)


def decode_response(msg):
initial_frame = msg.next_frame()
return FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_RESPONSE_OFFSET)
Loading