Skip to content

Commit 7379a14

Browse files
committed
Implement CP Fenced Lock
Implementation, tests, documentation and code samples for the Fenced Lock is added. Also, the session manager for session aware CP proxies is implemented fully along with its test suit. The unused parts of the session manager will be used with the Semaphore proxy.
1 parent ef87776 commit 7379a14

File tree

16 files changed

+2320
-25
lines changed

16 files changed

+2320
-25
lines changed

README.md

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@
5757
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
5858
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
5959
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
60-
* [7.4.12.2. Using CountDownLatch](#74122-using-countdownlatch)
61-
* [7.4.12.3. Using AtomicReference](#74123-using-atomicreference)
60+
* [7.4.12.2. Using Lock](#74122-using-lock)
61+
* [7.4.12.3. Using CountDownLatch](#74123-using-countdownlatch)
62+
* [7.4.12.4. Using AtomicReference](#74124-using-atomicreference)
6263
* [7.5. Distributed Events](#75-distributed-events)
6364
* [7.5.1. Cluster Events](#751-cluster-events)
6465
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
@@ -1577,7 +1578,7 @@ Refer to [CP Subsystem](https://docs.hazelcast.org/docs/latest/manual/html-singl
15771578
Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently.
15781579
The CP data structures differ from the other Hazelcast data structures in two aspects.
15791580
First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface.
1580-
Hence, callers should cache returned proxy objects. Second, if you call `DistributedObject.destroy()` on a CP data structure proxy,
1581+
Hence, callers should cache returned proxy objects. Second, if you call `distributed_object.destroy()` on a CP data structure proxy,
15811582
that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed.
15821583
For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.
15831584

@@ -1610,7 +1611,62 @@ print ('CAS operation result:', result)
16101611
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
16111612
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.
16121613

1613-
#### 7.4.12.2. Using CountDownLatch
1614+
#### 7.4.12.2. Using Lock
1615+
1616+
Hazelcast `FencedLock` is the distributed and reentrant implementation of a linearizable lock.
1617+
It is CP with respect to the CAP principle. It works on top of the Raft consensus algorithm.
1618+
It offers linearizability during crash-stop failures and network partitions.
1619+
If a network partition occurs, it remains available on at most one side of the partition.
1620+
1621+
A basic Lock usage example is shown below.
1622+
1623+
```python
1624+
# Get a FencedLock called "my-lock"
1625+
lock = client.cp_subsystem.get_lock("my-lock").blocking()
1626+
# Acquire the lock
1627+
lock.lock()
1628+
try:
1629+
# Your guarded code goes here
1630+
pass
1631+
finally:
1632+
# Make sure to release the lock
1633+
lock.unlock()
1634+
```
1635+
1636+
FencedLock works on top of CP sessions. It keeps a CP session open while the lock is acquired. Please refer to [CP Session](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-sessions) documentation for more information.
1637+
1638+
By default, FencedLock is reentrant. Once a caller acquires the lock, it can acquire the lock reentrantly as many times as it wants in a linearizable manner.
1639+
You can configure the reentrancy behavior on the member side. For instance, reentrancy can be disabled and FencedLock can work as a non-reentrant mutex.
1640+
You can also set a custom reentrancy limit. When the reentrancy limit is already reached, FencedLock does not block a lock call.
1641+
Instead, it fails with ``LockAcquireLimitReachedError`` or a specified return value.
1642+
1643+
Distributed locks are unfortunately *not equivalent* to single-node mutexes because of the complexities in distributed systems, such as uncertain communication patterns, and independent and partial failures.
1644+
In an asynchronous network, no lock service can guarantee mutual exclusion, because there is no way to distinguish between a slow and a crashed process.
1645+
Consider the following scenario, where a Hazelcast client acquires a FencedLock, then hits a long pause.
1646+
Since it will not be able to commit session heartbeats while paused, its CP session will be eventually closed.
1647+
After this moment, another Hazelcast client can acquire this lock.
1648+
If the first client wakes up again, it may not immediately notice that it has lost ownership of the lock.
1649+
In this case, multiple clients think they hold the lock. If they attempt to perform an operation on a shared resource, they can break the system.
1650+
To prevent such situations, you can choose to use an infinite session timeout, but this time probably you are going to deal with liveliness issues.
1651+
For the scenario above, even if the first client actually crashes, requests sent by 2 clients can be re-ordered in the network and hit the external resource in reverse order.
1652+
1653+
There is a simple solution for this problem. Lock holders are ordered by a monotonic fencing token, which increments each time the lock is assigned to a new owner.
1654+
This fencing token can be passed to external services or resources to ensure sequential execution of side effects performed by lock holders.
1655+
1656+
The following diagram illustrates the idea. Client-1 acquires the lock first and receives ``1`` as its fencing token.
1657+
Then, it passes this token to the external service, which is our shared resource in this scenario.
1658+
Just after that, Client-1 hits a long GC pause and eventually loses ownership of the lock because it misses to commit CP session heartbeats.
1659+
Then, Client-2 chimes in and acquires the lock. Similar to Client-1, Client-2 passes its fencing token to the external service.
1660+
After that, once Client-1 comes back alive, its write request will be rejected by the external service, and only Client-2 will be able to safely talk to it.
1661+
1662+
![CP Fenced Lock diagram](https://docs.hazelcast.org/docs/latest/manual/html-single/images/FencedLock.png)
1663+
1664+
You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper.
1665+
1666+
To get fencing token, one may use ``lock.lock_and_get_fence()`` or ``lock.try_lock_and_get_fence()`` utility methods, or ``lock.get_fence()`` method
1667+
while holding the lock.
1668+
1669+
#### 7.4.12.3. Using CountDownLatch
16141670

16151671
Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch.
16161672
This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.
@@ -1646,7 +1702,7 @@ print("Count is zero:", count_is_zero)
16461702

16471703
> **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.**
16481704
1649-
#### 7.4.12.3. Using AtomicReference
1705+
#### 7.4.12.4. Using AtomicReference
16501706

16511707
Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
16521708
It provides a set of atomic operations allowing to modify the value behind the reference.

examples/cp/fenced_lock_example.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import hazelcast
2+
3+
client = hazelcast.HazelcastClient()
4+
5+
lock = client.cp_subsystem.get_lock("my-lock").blocking()
6+
7+
locked = lock.is_locked()
8+
print("Locked initially:", locked)
9+
10+
fence = lock.lock_and_get_fence()
11+
print("Fence token:", fence)
12+
try:
13+
locked = lock.is_locked()
14+
print("Locked after lock:", locked)
15+
16+
locked = lock.try_lock()
17+
print("Locked reentratly:", locked)
18+
19+
# more guarded code
20+
finally:
21+
# unlock must be called for each successful lock request
22+
lock.unlock()
23+
lock.unlock()
24+
25+
client.shutdown()

hazelcast/client.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from hazelcast.config import _Config
88
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
99
from hazelcast.core import DistributedObjectInfo, DistributedObjectEvent
10-
from hazelcast.cp import CPSubsystem
10+
from hazelcast.cp import CPSubsystem, ProxySessionManager
1111
from hazelcast.invocation import InvocationService, Invocation
1212
from hazelcast.listener import ListenerService, ClusterViewListenerService
1313
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
@@ -331,6 +331,7 @@ def __init__(self, **kwargs):
331331
self._logger_extras)
332332
self._proxy_manager = ProxyManager(self._context)
333333
self.cp_subsystem = CPSubsystem(self._context)
334+
self._proxy_session_manager = ProxySessionManager(self._context)
334335
self._transaction_manager = TransactionManager(self._context, self._logger_extras)
335336
self._lock_reference_id_generator = AtomicInteger(1)
336337
self._statistics = Statistics(self, self._reactor, self._connection_manager,
@@ -348,7 +349,8 @@ def _init_context(self):
348349
self._context.init_context(self.config, self._invocation_service, self._internal_partition_service,
349350
self._internal_cluster_service, self._connection_manager,
350351
self._serialization_service, self._listener_service, self._proxy_manager,
351-
self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras)
352+
self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras,
353+
self.name, self._proxy_session_manager, self._reactor)
352354

353355
def _start(self):
354356
self._reactor.start()
@@ -597,6 +599,7 @@ def shutdown(self):
597599
if self._internal_lifecycle_service.running:
598600
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN)
599601
self._internal_lifecycle_service.shutdown()
602+
self._proxy_session_manager.shutdown().result()
600603
self._near_cache_manager.destroy_near_caches()
601604
self._connection_manager.shutdown()
602605
self._invocation_service.shutdown()
@@ -666,11 +669,15 @@ def __init__(self):
666669
self.near_cache_manager = None
667670
self.lock_reference_id_generator = None
668671
self.logger_extras = None
672+
self.name = None
673+
self.proxy_session_manager = None
674+
self.reactor = None
669675

670676
def init_context(self, config, invocation_service, partition_service,
671677
cluster_service, connection_manager, serialization_service,
672678
listener_service, proxy_manager, near_cache_manager,
673-
lock_reference_id_generator, logger_extras):
679+
lock_reference_id_generator, logger_extras, name,
680+
proxy_session_manager, reactor):
674681
self.config = config
675682
self.invocation_service = invocation_service
676683
self.partition_service = partition_service
@@ -682,3 +689,6 @@ def init_context(self, config, invocation_service, partition_service,
682689
self.near_cache_manager = near_cache_manager
683690
self.lock_reference_id_generator = lock_reference_id_generator
684691
self.logger_extras = logger_extras
692+
self.name = name
693+
self.proxy_session_manager = proxy_session_manager
694+
self.reactor = reactor

0 commit comments

Comments
 (0)