From bf29a6f4d4bcb60af5a1d43af52d359e98cd425a Mon Sep 17 00:00:00 2001 From: Charles-Henri de Boysson Date: Thu, 2 Jun 2022 13:39:14 -0400 Subject: [PATCH] feat(core): Add support for Container and TTL nodes Also add support through transations. Closes #334, #496 --- .flake8 | 2 +- kazoo/client.py | 205 +++++++++++++++++++------------- kazoo/protocol/serialization.py | 83 +++++++++++-- 3 files changed, 197 insertions(+), 93 deletions(-) diff --git a/.flake8 b/.flake8 index 20d6db4d..ff587792 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,2 @@ [flake8] -ignore = BLK100 +ignore = BLK100,W503 diff --git a/kazoo/client.py b/kazoo/client.py index 25baa683..133c2ad5 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -32,6 +32,8 @@ CloseInstance, Create, Create2, + CreateContainer, + CreateTTL, Delete, Exists, GetChildren, @@ -873,7 +875,8 @@ def sync(self, path): return self.sync_async(path).get() def create(self, path, value=b"", acl=None, ephemeral=False, - sequence=False, makepath=False, include_data=False): + sequence=False, makepath=False, include_data=False, + container=False, ttl=0): """Create a node with the given value as its data. Optionally set an ACL on the node. @@ -950,15 +953,19 @@ def create(self, path, value=b"", acl=None, ephemeral=False, The `makepath` option. .. versionadded:: 2.7 The `include_data` option. + .. versionadded:: 2.8 + The container and ttl options. + """ acl = acl or self.default_acl return self.create_async( path, value, acl=acl, ephemeral=ephemeral, - sequence=sequence, makepath=makepath, include_data=include_data - ).get() + sequence=sequence, makepath=makepath, include_data=include_data, + container=container, ttl=ttl).get() def create_async(self, path, value=b"", acl=None, ephemeral=False, - sequence=False, makepath=False, include_data=False): + sequence=False, makepath=False, include_data=False, + container=False, ttl=0): """Asynchronously create a ZNode. Takes the same arguments as :meth:`create`. @@ -967,45 +974,35 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False, .. versionadded:: 1.1 The makepath option. .. versionadded:: 2.7 - The `include_data` option. + The include_data option. + .. versionadded:: 2.8 + The container and ttl options. """ if acl is None and self.default_acl: acl = self.default_acl - - if not isinstance(path, string_types): - raise TypeError("Invalid type for 'path' (string expected)") - if acl and (isinstance(acl, ACL) or - not isinstance(acl, (tuple, list))): - raise TypeError("Invalid type for 'acl' (acl must be a tuple/list" - " of ACL's") - if value is not None and not isinstance(value, bytes_types): - raise TypeError("Invalid type for 'value' (must be a byte string)") - if not isinstance(ephemeral, bool): - raise TypeError("Invalid type for 'ephemeral' (bool expected)") - if not isinstance(sequence, bool): - raise TypeError("Invalid type for 'sequence' (bool expected)") if not isinstance(makepath, bool): raise TypeError("Invalid type for 'makepath' (bool expected)") - if not isinstance(include_data, bool): - raise TypeError("Invalid type for 'include_data' (bool expected)") - - flags = 0 - if ephemeral: - flags |= 1 - if sequence: - flags |= 2 - if acl is None: - acl = OPEN_ACL_UNSAFE + opcode = _create_opcode( + path, value, acl, self.chroot, + ephemeral, sequence, include_data, container, ttl + ) async_result = self.handler.async_result() @capture_exceptions(async_result) def do_create(): - result = self._create_async_inner( - path, value, acl, flags, - trailing=sequence, include_data=include_data - ) - result.rawlink(create_completion) + inner_async_result = self.handler.async_result() + + call_result = self._call(opcode, inner_async_result) + if call_result is False: + # We hit a short-circuit exit on the _call. Because we are + # not using the original async_result here, we bubble the + # exception upwards to the do_create function in + # KazooClient.create so that it gets set on the correct + # async_result object + raise inner_async_result.exception + + inner_async_result.rawlink(create_completion) @capture_exceptions(async_result) def retry_completion(result): @@ -1015,11 +1012,11 @@ def retry_completion(result): @wrap(async_result) def create_completion(result): try: - if include_data: + if opcode.type == Create.type: + return self.unchroot(result.get()) + else: new_path, stat = result.get() return self.unchroot(new_path), stat - else: - return self.unchroot(result.get()) except NoNodeError: if not makepath: raise @@ -1032,26 +1029,6 @@ def create_completion(result): do_create() return async_result - def _create_async_inner(self, path, value, acl, flags, - trailing=False, include_data=False): - async_result = self.handler.async_result() - if include_data: - opcode = Create2 - else: - opcode = Create - - call_result = self._call( - opcode(_prefix_root(self.chroot, path, trailing=trailing), - value, acl, flags), async_result) - if call_result is False: - # We hit a short-circuit exit on the _call. Because we are - # not using the original async_result here, we bubble the - # exception upwards to the do_create function in - # KazooClient.create so that it gets set on the correct - # async_result object - raise async_result.exception - return async_result - def ensure_path(self, path, acl=None): """Recursively create a path if it doesn't exist. @@ -1312,7 +1289,7 @@ def set_acls_async(self, path, acls, version=-1): """ if not isinstance(path, string_types): raise TypeError("Invalid type for 'path' (string expected)") - if isinstance(acls, ACL) or not isinstance(acls, (tuple, list)): + if not isinstance(acls, (tuple, list)) or isinstance(acls, ACL): raise TypeError("Invalid type for 'acl' (acl must be a tuple/list" " of ACL's)") if not isinstance(version, int): @@ -1590,39 +1567,24 @@ def __init__(self, client): self.committed = False def create(self, path, value=b"", acl=None, ephemeral=False, - sequence=False): + sequence=False, include_data=False, container=False, ttl=0): """Add a create ZNode to the transaction. Takes the same arguments as :meth:`KazooClient.create`, with the exception of `makepath`. :returns: None + .. versionadded:: 2.8 + The include_data, container and ttl options. """ if acl is None and self.client.default_acl: acl = self.client.default_acl - if not isinstance(path, string_types): - raise TypeError("Invalid type for 'path' (string expected)") - if acl and not isinstance(acl, (tuple, list)): - raise TypeError("Invalid type for 'acl' (acl must be a tuple/list" - " of ACL's") - if not isinstance(value, bytes_types): - raise TypeError("Invalid type for 'value' (must be a byte string)") - if not isinstance(ephemeral, bool): - raise TypeError("Invalid type for 'ephemeral' (bool expected)") - if not isinstance(sequence, bool): - raise TypeError("Invalid type for 'sequence' (bool expected)") - - flags = 0 - if ephemeral: - flags |= 1 - if sequence: - flags |= 2 - if acl is None: - acl = OPEN_ACL_UNSAFE - - self._add(Create(_prefix_root(self.client.chroot, path), value, acl, - flags), None) + opcode = _create_opcode( + path, value, acl, self.client.chroot, + ephemeral, sequence, include_data, container, ttl + ) + self._add(opcode, None) def delete(self, path, version=-1): """Add a delete ZNode to the transaction. Takes the same @@ -1701,3 +1663,86 @@ def _add(self, request, post_processor=None): self._check_tx_state() self.client.logger.log(BLATHER, 'Added %r to %r', request, self) self.operations.append(request) + + +def _create_opcode(path, value, acl, chroot, + ephemeral, sequence, include_data, container, ttl): + """Helper function. + Creates the create OpCode for regular `client.create()` operations as + well as in a `client.transaction()` context. + """ + if not isinstance(path, string_types): + raise TypeError("Invalid type for 'path' (string expected)") + if (acl and ( + not isinstance(acl, (tuple, list)) + or isinstance(acl, ACL))): + raise TypeError("Invalid type for 'acl' (acl must be a tuple/list" + " of ACL's") + if value is not None and not isinstance(value, bytes_types): + raise TypeError("Invalid type for 'value' (must be a byte string)") + if not isinstance(ephemeral, bool): + raise TypeError("Invalid type for 'ephemeral' (bool expected)") + if not isinstance(sequence, bool): + raise TypeError("Invalid type for 'sequence' (bool expected)") + if not isinstance(include_data, bool): + raise TypeError("Invalid type for 'include_data' (bool expected)") + if not isinstance(container, bool): + raise TypeError("Invalid type for 'container' (bool expected)") + if not isinstance(ttl, int) or ttl < 0: + raise TypeError("Invalid 'ttl' (integer >= 0 expected)") + if ttl and ephemeral: + raise TypeError("Invalid node creation: ephemeral & ttl") + if container and (ephemeral or sequence or ttl): + raise TypeError( + "Invalid node creation: container & ephemeral/sequence/ttl" + ) + + # Should match Zookeeper's CreateMode fromFlag + # https://github.com/apache/zookeeper/blob/master/zookeeper-server/ + # src/main/java/org/apache/zookeeper/CreateMode.java#L112 + flags = 0 + if ephemeral: + flags |= 1 + if sequence: + flags |= 2 + if container: + flags = 4 + if ttl: + if sequence: + flags = 6 + else: + flags = 5 + + if acl is None: + acl = OPEN_ACL_UNSAFE + + # Figure out the OpCode we are going to send + if include_data: + return Create2( + _prefix_root(chroot, path, trailing=sequence), + value, + acl, + flags + ) + elif container: + return CreateContainer( + _prefix_root(chroot, path, trailing=False), + value, + acl, + flags + ) + elif ttl: + return CreateTTL( + _prefix_root(chroot, path, trailing=sequence), + value, + acl, + flags, + ttl + ) + else: + return Create( + _prefix_root(chroot, path, trailing=sequence), + value, + acl, + flags + ) diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 80fa4d10..465d2c37 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -1,4 +1,5 @@ """Zookeeper Serializers, Deserializers, and NamedTuple objects""" + from collections import namedtuple import struct @@ -15,7 +16,6 @@ int_struct = struct.Struct('!i') int_int_struct = struct.Struct('!ii') int_int_long_struct = struct.Struct('!iiq') - int_long_int_long_struct = struct.Struct('!iqiq') long_struct = struct.Struct('!q') multiheader_struct = struct.Struct('!iBi') @@ -128,8 +128,9 @@ def serialize(self): b.extend(write_buffer(self.data)) b.extend(int_struct.pack(len(self.acl))) for acl in self.acl: - b.extend(int_struct.pack(acl.perms) + - write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id)) b.extend(int_struct.pack(self.flags)) return b @@ -227,8 +228,9 @@ def serialize(self): b.extend(write_string(self.path)) b.extend(int_struct.pack(len(self.acls))) for acl in self.acls: - b.extend(int_struct.pack(acl.perms) + - write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id)) b.extend(int_struct.pack(self.version)) return b @@ -311,8 +313,8 @@ class Transaction(namedtuple('Transaction', 'operations')): def serialize(self): b = bytearray() for op in self.operations: - b.extend(MultiHeader(op.type, False, -1).serialize() + - op.serialize()) + b.extend(MultiHeader(op.type, False, -1).serialize() + + op.serialize()) return b + multiheader_struct.pack(-1, True, -1) @classmethod @@ -323,11 +325,17 @@ def deserialize(cls, bytes, offset): while not header.done: if header.type == Create.type: response, offset = read_string(bytes, offset) + elif header.type == Create2.type: + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + offset += stat_struct.size + response = (path, stat) elif header.type == Delete.type: response = True elif header.type == SetData.type: response = ZnodeStat._make( - stat_struct.unpack_from(bytes, offset)) + stat_struct.unpack_from(bytes, offset) + ) offset += stat_struct.size elif header.type == CheckVersion.type: response = True @@ -346,6 +354,10 @@ def unchroot(client, response): for result in response: if isinstance(result, six.string_types): resp.append(client.unchroot(result)) + elif isinstance(result, ZnodeStat): # Need to test before tuple + resp.append(result) + elif isinstance(result, tuple): + resp.append((client.unchroot(result[0]), result[1])) else: resp.append(result) return resp @@ -360,8 +372,9 @@ def serialize(self): b.extend(write_buffer(self.data)) b.extend(int_struct.pack(len(self.acl))) for acl in self.acl: - b.extend(int_struct.pack(acl.perms) + - write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id)) b.extend(int_struct.pack(self.flags)) return b @@ -391,12 +404,58 @@ def deserialize(cls, bytes, offset): return data, stat +class CreateContainer(namedtuple('CreateContainer', 'path data acl flags')): + type = 19 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id)) + b.extend(int_struct.pack(self.flags)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return path, stat + + +class CreateTTL(namedtuple('CreateTTL', 'path data acl flags ttl')): + type = 21 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id)) + b.extend(int_struct.pack(self.flags)) + b.extend(long_struct.pack(self.ttl)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return path, stat + + class Auth(namedtuple('Auth', 'auth_type scheme auth')): type = 100 def serialize(self): - return (int_struct.pack(self.auth_type) + write_string(self.scheme) + - write_string(self.auth)) + return (int_struct.pack(self.auth_type) + + write_string(self.scheme) + + write_string(self.auth)) class SASL(namedtuple('SASL', 'challenge')):