Skip to content

Commit

Permalink
feat(core): add support for persistent recursive watches
Browse files Browse the repository at this point in the history
ZooKeeper 3.6.0 added support for persistent, and persistent
recursive watches.  This adds the corresponding support to the
Kazoo client class.
  • Loading branch information
jeblair authored and StephenSorriaux committed Mar 16, 2024
1 parent 6540c93 commit 45cef47
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 8 deletions.
117 changes: 117 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kazoo.protocol.connection import ConnectionHandler
from kazoo.protocol.paths import _prefix_root, normpath
from kazoo.protocol.serialization import (
AddWatch,
Auth,
CheckVersion,
CloseInstance,
Expand All @@ -38,6 +39,7 @@
SetACL,
GetData,
Reconfig,
RemoveWatches,
SetData,
Sync,
Transaction,
Expand All @@ -48,6 +50,8 @@
KazooState,
KeeperState,
WatchedEvent,
AddWatchMode,
WatcherType,
)
from kazoo.retry import KazooRetry
from kazoo.security import ACL, OPEN_ACL_UNSAFE
Expand Down Expand Up @@ -248,6 +252,8 @@ def __init__(
self.state_listeners = set()
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)
self._reset()
self.read_only = read_only

Expand Down Expand Up @@ -416,8 +422,16 @@ def _reset_watchers(self):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

for persistent_watchers in self._persistent_watchers.values():
watchers.extend(persistent_watchers)

Check warning on line 426 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L426

Added line #L426 was not covered by tests

for pr_watchers in self._persistent_recursive_watchers.values():
watchers.extend(pr_watchers)

Check warning on line 429 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L429

Added line #L429 was not covered by tests

self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)

ev = WatchedEvent(EventType.NONE, self._state, None)
for watch in watchers:
Expand Down Expand Up @@ -1644,8 +1658,111 @@ def reconfig_async(self, joining, leaving, new_members, from_config):

return async_result

def add_watch(self, path, watch, mode):
"""Add a watch.
This method adds persistent watches. Unlike the data and
child watches which may be set by calls to
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
:meth:`KazooClient.get_children`, persistent watches are not
removed after being triggered.
To remove a persistent watch, use
:meth:`KazooClient.remove_all_watches` with an argument of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
The `mode` argument determines whether or not the watch is
recursive. To set a persistent watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT`. To set a
persistent recursive watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE`.
:param path: Path of node to watch.
:param watch: Watch callback to set for future changes
to this path.
:param mode: The mode to use.
:type mode: int
:raises:
:exc:`~kazoo.exceptions.MarshallingError` if mode is
unknown.
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.add_watch_async(path, watch, mode).get()

def add_watch_async(self, path, watch, mode):
"""Asynchronously add a watch. Takes the same arguments as
:meth:`add_watch`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")

Check warning on line 1700 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1700

Added line #L1700 was not covered by tests
if not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")

Check warning on line 1702 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1702

Added line #L1702 was not covered by tests
if not isinstance(mode, int):
raise TypeError("Invalid type for 'mode' (int expected)")

Check warning on line 1704 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1704

Added line #L1704 was not covered by tests
if mode not in (
AddWatchMode.PERSISTENT,
AddWatchMode.PERSISTENT_RECURSIVE,
):
raise ValueError("Invalid value for 'mode'")

Check warning on line 1709 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1709

Added line #L1709 was not covered by tests

async_result = self.handler.async_result()
self._call(
AddWatch(_prefix_root(self.chroot, path), watch, mode),
async_result,
)
return async_result

def remove_all_watches(self, path, watcher_type):
"""Remove watches from a path.
This removes all watches of a specified type (data, child,
any) from a given path.
The `watcher_type` argument specifies which type to use. It
may be one of:
* :attr:`~kazoo.protocol.states.WatcherType.DATA`
* :attr:`~kazoo.protocol.states.WatcherType.CHILDREN`
* :attr:`~kazoo.protocol.states.WatcherType.ANY`
To remove persistent watches, specify a watcher type of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
:param path: Path of watch to remove.
:param watcher_type: The type of watch to remove.
:type watcher_type: int
"""

return self.remove_all_watches_async(path, watcher_type).get()

def remove_all_watches_async(self, path, watcher_type):
"""Asynchronously remove watches. Takes the same arguments as
:meth:`remove_all_watches`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")

Check warning on line 1746 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1746

Added line #L1746 was not covered by tests
if not isinstance(watcher_type, int):
raise TypeError("Invalid type for 'watcher_type' (int expected)")

Check warning on line 1748 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1748

Added line #L1748 was not covered by tests
if watcher_type not in (
WatcherType.ANY,
WatcherType.CHILDREN,
WatcherType.DATA,
):
raise ValueError("Invalid value for 'watcher_type'")

Check warning on line 1754 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1754

Added line #L1754 was not covered by tests

async_result = self.handler.async_result()
self._call(
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
async_result,
)
return async_result


class TransactionRequest(object):

"""A Zookeeper Transaction Request
A Transaction provides a builder object that can be used to
Expand Down
56 changes: 49 additions & 7 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
AddWatch,
Auth,
Close,
Connect,
Expand All @@ -28,17 +29,20 @@
GetChildren2,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
SASL,
Transaction,
Watch,
int_struct,
)
from kazoo.protocol.states import (
AddWatchMode,
Callback,
KeeperState,
WatchedEvent,
EVENT_TYPE_MAP,
WatcherType,
)
from kazoo.retry import (
ForceRetryError,
Expand Down Expand Up @@ -363,6 +367,18 @@ def _write(self, msg, timeout):
raise ConnectionDropped("socket connection broken")
sent += bytes_sent

def _find_persistent_recursive_watchers(self, path):
parts = path.split("/")
watchers = []
for count in range(len(parts)):
candidate = "/".join(parts[: count + 1])
if not candidate:
continue
watchers.extend(
self.client._persistent_recursive_watchers.get(candidate, [])
)
return watchers

def _read_watch_event(self, buffer, offset):
client = self.client
watch, offset = Watch.deserialize(buffer, offset)
Expand All @@ -374,9 +390,13 @@ def _read_watch_event(self, buffer, offset):

if watch.type in (CREATED_EVENT, CHANGED_EVENT):
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == DELETED_EVENT:
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._child_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
Expand Down Expand Up @@ -448,13 +468,35 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
# Determine if watchers should be registered or unregistered
if not client._stopped.is_set():
watcher = getattr(request, "watcher", None)
if watcher:
if isinstance(request, AddWatch):
if request.mode == AddWatchMode.PERSISTENT:
client._persistent_watchers[request.path].add(
watcher
)
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
client._persistent_recursive_watchers[
request.path
].add(watcher)
elif isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, RemoveWatches):
if request.watcher_type == WatcherType.CHILDREN:
client._child_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.DATA:
client._data_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.ANY:
client._child_watchers.pop(request.path, None)
client._data_watchers.pop(request.path, None)
client._persistent_watchers.pop(request.path, None)
client._persistent_recursive_watchers.pop(
request.path, None
)

if isinstance(request, Close):
self.logger.log(BLATHER, "Read close response")
Expand Down
28 changes: 28 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
return data, stat


class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
type = 18

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.watcher_type))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand All @@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
return challenge, offset


class AddWatch(namedtuple("AddWatch", "path watcher mode")):
type = 106

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.mode))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
41 changes: 41 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,44 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class AddWatchMode(object):
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
.. attribute:: PERSISTENT
The watch is not removed when trigged.
.. attribute:: PERSISTENT_RECURSIVE
The watch is not removed when trigged, and applies to all
paths underneath the supplied path as well.
"""

PERSISTENT = 0
PERSISTENT_RECURSIVE = 1


class WatcherType(object):
"""Watcher types for use with
:meth:`~kazoo.client.KazooClient.remove_all_watches`
.. attribute:: CHILDREN
Child watches.
.. attribute:: DATA
Data watches.
.. attribute:: ANY
Any type of watch (child, data, persistent, or persistent
recursive).
"""

CHILDREN = 1
DATA = 2
ANY = 3
Loading

0 comments on commit 45cef47

Please sign in to comment.