Skip to content

Commit

Permalink
feat(core): Support additionaal lock contenter patterns
Browse files Browse the repository at this point in the history
Allows configurable multi-implementations cooperations in locks (e.g.
Zookeeper python & go clients contending for the same lock).
  • Loading branch information
ceache committed Apr 25, 2020
1 parent a4efaac commit 225eeec
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 67 deletions.
84 changes: 56 additions & 28 deletions kazoo/recipe/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
import sys

try:
from time import monotonic as now
except ImportError:
Expand All @@ -27,13 +28,13 @@
CancelledError,
KazooException,
LockTimeout,
NoNodeError
NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
RetryFailedError
RetryFailedError,
)


Expand Down Expand Up @@ -80,20 +81,33 @@ class Lock(object):

# Node names which exclude this contender when present at a lower
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__", "-lock-"]
_EXCLUDE_NAMES = ["__lock__"]

def __init__(self, client, path, identifier=None):
def __init__(
self, client, path, identifier=None, additional_lock_patterns=()
):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
:param identifier: Name to use for this lock contender. This
can be useful for querying to see who the
current lock contenders are.
:param identifier: Name to use for this lock contender. This can be
useful for querying to see who the current lock
contenders are.
:param additional_lock_patterns: Strings that will be used to
identify other znode in the path
that should be considered contenders
for this lock.
Use this for cross-implementation
compatibility.
.. versionadded:: 2.7.1
The additional_lock_patterns option.
"""
self.client = client
self.path = path
self._exclude_names = set(
self._EXCLUDE_NAMES + list(additional_lock_patterns)
)

# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
Expand All @@ -113,8 +127,9 @@ def __init__(self, client, path, identifier=None):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
self._retry = KazooRetry(max_tries=None,
sleep_func=client.handler.sleep_func)
self._retry = KazooRetry(
max_tries=None, sleep_func=client.handler.sleep_func
)
self._lock = client.handler.lock_object()

def _ensure_path(self):
Expand Down Expand Up @@ -179,9 +194,12 @@ def _acquire_lock():
try:
gotten = False
try:
gotten = retry(self._inner_acquire,
blocking=blocking, timeout=timeout,
ephemeral=ephemeral)
gotten = retry(
self._inner_acquire,
blocking=blocking,
timeout=timeout,
ephemeral=ephemeral,
)
except RetryFailedError:
pass
except KazooException:
Expand Down Expand Up @@ -222,8 +240,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
self.create_tried = True

if not node:
node = self.client.create(self.create_path, self.data,
ephemeral=ephemeral, sequence=True)
node = self.client.create(
self.create_path, self.data, ephemeral=ephemeral, sequence=True
)
# strip off path to node
node = node[len(self.path) + 1:]

Expand Down Expand Up @@ -263,14 +282,16 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
raise LockTimeout("Failed to acquire lock on %s after "
"%s seconds" % (self.path, timeout))
raise LockTimeout(
"Failed to acquire lock on %s after %s seconds"
% (self.path, timeout)
)
finally:
self.client.remove_listener(self._watch_session)

def predecessor(self, children, index):
for c in reversed(children[:index]):
if any(n in c for n in self._EXCLUDE_NAMES):
if any(n in c for n in self._exclude_names):
return c
return None

Expand All @@ -289,12 +310,13 @@ def _get_sorted_children(self):
# (eg. in case of a lease), just sort them last ('~' sorts after all
# ASCII digits).
def _seq(c):
for name in ["__lock__", "-lock-", "__rlock__"]:
for name in self._exclude_names:
idx = c.find(name)
if idx != -1:
return c[idx + len(name):]
# Sort unknown node names eg. "lease_holder" last.
return '~'

children.sort(key=_seq)
return children

Expand Down Expand Up @@ -391,8 +413,9 @@ class WriteLock(Lock):
shared lock.
"""

_NODE_NAME = "__lock__"
_EXCLUDE_NAMES = ["__lock__", "-lock-", "__rlock__"]
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]


class ReadLock(Lock):
Expand Down Expand Up @@ -420,8 +443,9 @@ class ReadLock(Lock):
shared lock.
"""

_NODE_NAME = "__rlock__"
_EXCLUDE_NAMES = ["__lock__", "-lock-"]
_EXCLUDE_NAMES = ["__lock__"]


class Semaphore(object):
Expand Down Expand Up @@ -458,6 +482,7 @@ class Semaphore(object):
The max_leases check.
"""

def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
Expand Down Expand Up @@ -509,8 +534,8 @@ def _ensure_path(self):
else:
if leases != self.max_leases:
raise ValueError(
"Inconsistent max leases: %s, expected: %s" %
(leases, self.max_leases)
"Inconsistent max leases: %s, expected: %s"
% (leases, self.max_leases)
)
else:
self.client.set(self.path, str(self.max_leases).encode('utf-8'))
Expand Down Expand Up @@ -548,7 +573,8 @@ def acquire(self, blocking=True, timeout=None):

try:
self.is_acquired = self.client.retry(
self._inner_acquire, blocking=blocking, timeout=timeout)
self._inner_acquire, blocking=blocking, timeout=timeout
)
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
Expand Down Expand Up @@ -590,8 +616,9 @@ def _inner_acquire(self, blocking, timeout=None):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
"Failed to acquire semaphore on %s "
"after %s seconds" % (self.path, timeout))
"Failed to acquire semaphore on %s"
" after %s seconds" % (self.path, timeout)
)
else:
return False
finally:
Expand All @@ -612,8 +639,9 @@ def _get_lease(self, data=None):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
children = self.client.get_children(self.path,
self._watch_lease_change)
children = self.client.get_children(
self.path, self._watch_lease_change
)

# If there are leases available, acquire one
if len(children) < self.max_leases:
Expand Down
Loading

0 comments on commit 225eeec

Please sign in to comment.