-
Notifications
You must be signed in to change notification settings - Fork 73
CP Subsystem - Atomic Long #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
88ed9c2
Make futures chainable
mdumandag f19c51d
add cp related codecs
mdumandag 1f183e5
add cp atomic long implementation
mdumandag af600c7
add ifunction based methods
mdumandag 2a322b2
get labels from the correct client in the label tests
mdumandag 12de999
add atomic long to API documentation
mdumandag 6cbf991
address review comments
mdumandag File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| CP Subsystem | ||
| ============ | ||
|
|
||
| .. py:currentmodule:: hazelcast.cp | ||
|
|
||
| .. autoclass:: CPSubsystem |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| AtomicLong | ||
| ========== | ||
|
|
||
| .. automodule:: hazelcast.proxy.cp.atomic_long |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| CP Proxies | ||
| ==================== | ||
|
|
||
| .. toctree:: | ||
| hazelcast.proxy.cp.atomic_long |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| import hazelcast | ||
|
|
||
| client = hazelcast.HazelcastClient() | ||
|
|
||
| view_counter = client.cp_subsystem.get_atomic_long("views").blocking() | ||
| value = view_counter.get() | ||
| print("Value:", value) | ||
| new_value = view_counter.add_and_get(42) | ||
| print("New value:", new_value) | ||
|
|
||
| client.shutdown() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,16 +1,14 @@ | ||
| # TODO Fix this when we add CP Atomic Long | ||
|
|
||
| import hazelcast | ||
|
|
||
| # Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1 | ||
| # Start the Hazelcast Client and connect to an already running | ||
| # Hazelcast Cluster on 127.0.0.1 | ||
| # Note: CP Subsystem has to be enabled on the cluster | ||
| hz = hazelcast.HazelcastClient() | ||
| # Get an Atomic Counter, we'll call it "counter" | ||
| counter = hz.get_atomic_long("counter").blocking() | ||
| # Add and Get the "counter" | ||
| counter.add_and_get(3) | ||
| # value is 3 | ||
| # Display the "counter" value | ||
| print("counter: {}".format(counter.get())) | ||
| # Shutdown this Hazelcast Client | ||
| # Get the AtomicLong counter from Cluster | ||
| counter = hz.cp_subsystem.get_atomic_long("counter").blocking() | ||
| # Add and get the counter | ||
| value = counter.add_and_get(3) | ||
| print("Counter value is", value) | ||
| # Shutdown this Hazelcast client | ||
| hz.shutdown() | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| from hazelcast.invocation import Invocation | ||
| from hazelcast.protocol.codec import cp_group_create_cp_group_codec | ||
| from hazelcast.proxy.cp.atomic_long import AtomicLong | ||
| from hazelcast.util import check_true | ||
|
|
||
|
|
||
| class CPSubsystem(object): | ||
| """CP Subsystem is a component of Hazelcast that builds a strongly consistent | ||
| layer for a set of distributed data structures. | ||
|
|
||
| Its APIs can be used for implementing distributed coordination use cases, | ||
| such as leader election, distributed locking, synchronization, and metadata | ||
| management. | ||
|
|
||
| Its data structures are CP with respect to the CAP principle, i.e., they | ||
| always maintain linearizability and prefer consistency over availability | ||
| during network partitions. Besides network partitions, CP Subsystem | ||
| withstands server and client failures. | ||
|
|
||
| Data structures in CP Subsystem run in CP groups. Each CP group elects | ||
| its own Raft leader and runs the Raft consensus algorithm independently. | ||
|
|
||
| The CP data structures differ from the other Hazelcast data structures | ||
| in two aspects. First, an internal commit is performed on the METADATA CP | ||
| group every time you fetch a proxy from this interface. Hence, callers | ||
| should cache returned proxy objects. Second, if you call ``destroy()`` | ||
| on a CP data structure proxy, that data structure is terminated on the | ||
| underlying CP group and cannot be reinitialized until the CP group is | ||
| force-destroyed. For this reason, please make sure that you are completely | ||
| done with a CP data structure before destroying its proxy. | ||
| """ | ||
|
|
||
| def __init__(self, context): | ||
| self._proxy_manager = CPProxyManager(context) | ||
|
|
||
| def get_atomic_long(self, name): | ||
| """Returns the distributed AtomicLong instance with given name. | ||
|
|
||
| The instance is created on CP Subsystem. | ||
|
|
||
| If no group name is given within the ``name`` argument, then the | ||
| AtomicLong instance will be created on the default CP group. | ||
| If a group name is given, like ``.get_atomic_long("myLong@group1")``, | ||
| the given group will be initialized first, if not initialized | ||
| already, and then the instance will be created on this group. | ||
|
|
||
| Args: | ||
| name (str): Name of the AtomicLong. | ||
|
|
||
| Returns: | ||
| hazelcast.proxy.cp.atomic_long.AtomicLong: The AtomicLong proxy | ||
| for the given name. | ||
| """ | ||
| return self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) | ||
|
|
||
|
|
||
| _DEFAULT_GROUP_NAME = "default" | ||
|
|
||
|
|
||
| def _without_default_group_name(name): | ||
| name = name.strip() | ||
| idx = name.find("@") | ||
| if idx == -1: | ||
| return name | ||
|
|
||
| check_true(name.find("@", idx + 1) == -1, "Custom group name must be specified at most once") | ||
| group_name = name[idx + 1:].strip() | ||
| if group_name == _DEFAULT_GROUP_NAME: | ||
| return name[:idx] | ||
| return name | ||
|
|
||
|
|
||
| def _get_object_name_for_proxy(name): | ||
| idx = name.find("@") | ||
| if idx == -1: | ||
| return name | ||
|
|
||
| group_name = name[idx + 1:].strip() | ||
| check_true(len(group_name) > 0, "Custom CP group name cannot be empty string") | ||
| object_name = name[:idx].strip() | ||
| check_true(len(object_name) > 0, "Object name cannot be empty string") | ||
| return object_name | ||
|
|
||
|
|
||
| ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService" | ||
|
|
||
|
|
||
| class CPProxyManager(object): | ||
| def __init__(self, context): | ||
| self._context = context | ||
|
|
||
| def get_or_create(self, service_name, proxy_name): | ||
| proxy_name = _without_default_group_name(proxy_name) | ||
| object_name = _get_object_name_for_proxy(proxy_name) | ||
|
|
||
| group_id = self._get_group_id(proxy_name) | ||
| if service_name == ATOMIC_LONG_SERVICE: | ||
| return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) | ||
|
|
||
| def _get_group_id(self, proxy_name): | ||
| codec = cp_group_create_cp_group_codec | ||
| request = codec.encode_request(proxy_name) | ||
| invocation = Invocation(request, response_handler=codec.decode_response) | ||
| invocation_service = self._context.invocation_service | ||
| invocation_service.invoke(invocation) | ||
| return invocation.future.result() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| from hazelcast.serialization.bits import * | ||
| from hazelcast.protocol.builtin import FixSizedTypesCodec | ||
| from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer, RESPONSE_HEADER_SIZE | ||
| from hazelcast.protocol.codec.custom.raft_group_id_codec import RaftGroupIdCodec | ||
| from hazelcast.protocol.builtin import StringCodec | ||
|
|
||
| # hex: 0x090300 | ||
| _REQUEST_MESSAGE_TYPE = 590592 | ||
| # hex: 0x090301 | ||
| _RESPONSE_MESSAGE_TYPE = 590593 | ||
|
|
||
| _REQUEST_DELTA_OFFSET = REQUEST_HEADER_SIZE | ||
| _REQUEST_INITIAL_FRAME_SIZE = _REQUEST_DELTA_OFFSET + LONG_SIZE_IN_BYTES | ||
| _RESPONSE_RESPONSE_OFFSET = RESPONSE_HEADER_SIZE | ||
|
|
||
|
|
||
| def encode_request(group_id, name, delta): | ||
| buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE) | ||
| FixSizedTypesCodec.encode_long(buf, _REQUEST_DELTA_OFFSET, delta) | ||
| RaftGroupIdCodec.encode(buf, group_id) | ||
| StringCodec.encode(buf, name, True) | ||
| return OutboundMessage(buf, False) | ||
|
|
||
|
|
||
| def decode_response(msg): | ||
| initial_frame = msg.next_frame() | ||
| return FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_RESPONSE_OFFSET) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| from hazelcast.serialization.bits import * | ||
| from hazelcast.protocol.builtin import FixSizedTypesCodec | ||
| from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer, RESPONSE_HEADER_SIZE | ||
| from hazelcast.protocol.codec.custom.raft_group_id_codec import RaftGroupIdCodec | ||
| from hazelcast.protocol.builtin import StringCodec | ||
| from hazelcast.protocol.builtin import DataCodec | ||
|
|
||
| # hex: 0x090200 | ||
| _REQUEST_MESSAGE_TYPE = 590336 | ||
| # hex: 0x090201 | ||
| _RESPONSE_MESSAGE_TYPE = 590337 | ||
|
|
||
| _REQUEST_RETURN_VALUE_TYPE_OFFSET = REQUEST_HEADER_SIZE | ||
| _REQUEST_INITIAL_FRAME_SIZE = _REQUEST_RETURN_VALUE_TYPE_OFFSET + INT_SIZE_IN_BYTES | ||
| _RESPONSE_RESPONSE_OFFSET = RESPONSE_HEADER_SIZE | ||
|
|
||
|
|
||
| def encode_request(group_id, name, function, return_value_type): | ||
| buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE) | ||
| FixSizedTypesCodec.encode_int(buf, _REQUEST_RETURN_VALUE_TYPE_OFFSET, return_value_type) | ||
| RaftGroupIdCodec.encode(buf, group_id) | ||
| StringCodec.encode(buf, name) | ||
| DataCodec.encode(buf, function, True) | ||
| return OutboundMessage(buf, False) | ||
|
|
||
|
|
||
| def decode_response(msg): | ||
| initial_frame = msg.next_frame() | ||
| return FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_RESPONSE_OFFSET) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.