Skip to content

Commit

Permalink
feat: add support for persistent recursive watches
Browse files Browse the repository at this point in the history
ZooKeeper 3.6.0 added support for persistent, and persistent
recursive watches.  This adds the corresponding support to the
Kazoo client class.
  • Loading branch information
jeblair committed Mar 22, 2023
1 parent 33c348b commit 34adddf
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 8 deletions.
104 changes: 104 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kazoo.protocol.connection import ConnectionHandler
from kazoo.protocol.paths import _prefix_root, normpath
from kazoo.protocol.serialization import (
AddWatch,
Auth,
CheckVersion,
CloseInstance,
Expand All @@ -38,6 +39,7 @@
SetACL,
GetData,
Reconfig,
RemoveWatches,
SetData,
Sync,
Transaction,
Expand Down Expand Up @@ -248,6 +250,8 @@ def __init__(
self.state_listeners = set()
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)
self._reset()
self.read_only = read_only

Expand Down Expand Up @@ -416,8 +420,16 @@ def _reset_watchers(self):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

for persistent_watchers in self._persistent_watchers.values():
watchers.extend(persistent_watchers)

Check warning on line 424 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L424

Added line #L424 was not covered by tests

for pr_watchers in self._persistent_recursive_watchers.values():
watchers.extend(pr_watchers)

Check warning on line 427 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L427

Added line #L427 was not covered by tests

self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)

ev = WatchedEvent(EventType.NONE, self._state, None)
for watch in watchers:
Expand Down Expand Up @@ -1644,8 +1656,100 @@ def reconfig_async(self, joining, leaving, new_members, from_config):

return async_result

def add_watch(self, path, watch, mode):
"""Add a watch.
This method adds persistent watches. Unlike the data and
child watches which may be set by calls to
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
:meth:`KazooClient.get_children`, persistent watches are not
removed after being triggered.
To remove a persistent watch, use
:meth:`KazooClient.remove_all_watches` with an argument of
:attr:`~kazoo.states.WatcherType.ANY`.
The `mode` argument determines whether or not the watch is
recursive. To set a persistent watch, use
:class:`~kazoo.states.AddWatchMode.PERSISTENT`. To set a
persistent recursive watch, use
:class:`~kazoo.states.AddWatchMode.PERSISTENT_RECURSIVE`.
:param path: Path of node to watch.
:param watch: Watch callback to set for future changes
to this path.
:param mode: The mode to use.
:type mode: int
:raises:
:exc:`~kazoo.exceptions.MarshallingError` if mode is
unknown.
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.add_watch_async(path, watch, mode).get()

def add_watch_async(self, path, watch, mode):
"""Asynchronously add a watch. Takes the same arguments as
:meth:`add_watch`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")

Check warning on line 1698 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1698

Added line #L1698 was not covered by tests
if not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")

Check warning on line 1700 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1700

Added line #L1700 was not covered by tests
if not isinstance(mode, int):
raise TypeError("Invalid type for 'mode' (int expected)")

Check warning on line 1702 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1702

Added line #L1702 was not covered by tests

async_result = self.handler.async_result()
self._call(
AddWatch(_prefix_root(self.chroot, path), watch, mode),
async_result,
)
return async_result

def remove_all_watches(self, path, watcher_type):
"""Remove watches from a path.
This removes all watches of a specified type (data, child,
any) from a given path.
The `watcher_type` argument specifies which type to use. It
may be one of:
* :attr:`~kazoo.states.WatcherType.DATA`
* :attr:`~kazoo.states.WatcherType.CHILD`
* :attr:`~kazoo.states.WatcherType.ANY`
To remove persistent watches, specify a watcher type of
:attr:`~kazoo.states.WatcherType.ANY`.
:param path: Path of watch to remove.
:param watcher_type: The type of watch to remove.
:type watcher_type: int
"""

return self.remove_all_watches_async(path, watcher_type).get()

def remove_all_watches_async(self, path, watcher_type):
"""Asynchronously remove watches. Takes the same arguments as
:meth:`remove_all_watches`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")

Check warning on line 1739 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1739

Added line #L1739 was not covered by tests
if not isinstance(watcher_type, int):
raise TypeError("Invalid type for 'watcher_type' (int expected)")

Check warning on line 1741 in kazoo/client.py

View check run for this annotation

Codecov / codecov/patch

kazoo/client.py#L1741

Added line #L1741 was not covered by tests

async_result = self.handler.async_result()
self._call(
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
async_result,
)
return async_result


class TransactionRequest(object):

"""A Zookeeper Transaction Request
A Transaction provides a builder object that can be used to
Expand Down
5 changes: 5 additions & 0 deletions kazoo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ class NotReadOnlyCallError(ZookeeperError):
a read-only server"""


@_zookeeper_exception(-121)
class NoWatcherError(ZookeeperError):
"""No watcher was found at the supplied path"""


class ConnectionClosedError(SessionExpiredError):
"""Connection is closed"""

Expand Down
55 changes: 48 additions & 7 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
AddWatch,
Auth,
Close,
Connect,
Expand All @@ -28,13 +29,15 @@
GetChildren2,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
SASL,
Transaction,
Watch,
int_struct,
)
from kazoo.protocol.states import (
AddWatchMode,
Callback,
KeeperState,
WatchedEvent,
Expand Down Expand Up @@ -363,6 +366,18 @@ def _write(self, msg, timeout):
raise ConnectionDropped("socket connection broken")
sent += bytes_sent

def _find_persistent_recursive_watchers(self, path):
parts = path.split("/")
watchers = []
for count in range(len(parts)):
candidate = "/".join(parts[: count + 1])
if not candidate:
continue
watchers.extend(
self.client._persistent_recursive_watchers.get(candidate, [])
)
return watchers

def _read_watch_event(self, buffer, offset):
client = self.client
watch, offset = Watch.deserialize(buffer, offset)
Expand All @@ -374,9 +389,13 @@ def _read_watch_event(self, buffer, offset):

if watch.type in (CREATED_EVENT, CHANGED_EVENT):
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == DELETED_EVENT:
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._child_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
Expand Down Expand Up @@ -448,13 +467,35 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
# Determine if watchers should be registered or unregistered
if not client._stopped.is_set():
watcher = getattr(request, "watcher", None)
if watcher:
if isinstance(request, AddWatch):
if request.mode == AddWatchMode.PERSISTENT:
client._persistent_watchers[request.path].add(
watcher
)
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
client._persistent_recursive_watchers[
request.path
].add(watcher)
elif isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, RemoveWatches):
if request.watcher_type == 1:
client._child_watchers.pop(request.path, None)
elif request.watcher_type == 2:
client._data_watchers.pop(request.path, None)
elif request.watcher_type == 3:
client._child_watchers.pop(request.path, None)
client._data_watchers.pop(request.path, None)
client._persistent_watchers.pop(request.path, None)
client._persistent_recursive_watchers.pop(
request.path, None
)

if isinstance(request, Close):
self.logger.log(BLATHER, "Read close response")
Expand Down
28 changes: 28 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
return data, stat


class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
type = 18

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.watcher_type))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand All @@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
return challenge, offset


class AddWatch(namedtuple("AddWatch", "path watcher mode")):
type = 106

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.mode))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
41 changes: 41 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,44 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class AddWatchMode(object):
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
.. attribute:: PERSISTENT
The watch is not removed when trigged.
.. attribute:: PERSISTENT_RECURSIVE
The watch is not removed when trigged, and applies to all
paths underneath the supplied path as well.
"""

PERSISTENT = 0
PERSISTENT_RECURSIVE = 1


class WatcherType(object):
"""Watcher types for use with
:meth:`~kazoo.client.KazooClient.remove_all_watches`
.. attribute:: CHILDREN
Child watches.
.. attribute:: DATA
Data watches.
.. attribute:: ANY
Any type of watch (child, data, persistent, or persistent
recursive).
"""

CHILDREN = 1
DATA = 2
ANY = 3
Loading

0 comments on commit 34adddf

Please sign in to comment.