Skip to content

Commit 6980627

Browse files
author
Mithun Shivashankar
committed
More updates
1 parent c0fd2a1 commit 6980627

File tree

1 file changed

+123
-51
lines changed

1 file changed

+123
-51
lines changed

asab/library/providers/zookeeper.py

Lines changed: 123 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
import functools
66
import os.path
77
import urllib.parse
8+
from io import BytesIO
9+
from typing import Optional
810

911
import kazoo.exceptions
1012
import kazoo.recipe.watchers
1113

1214
from .abc import LibraryProviderABC
1315
from ..item import LibraryItem
1416
from ...zookeeper import ZooKeeperContainer
15-
from ...contextvars import Tenant
17+
from ...contextvars import Tenant, Authz
1618

1719
#
1820

@@ -264,78 +266,91 @@ def _check_version_counter(self, version):
264266

265267
self.App.TaskService.schedule(self._on_library_changed())
266268

267-
async def read(self, path: str) -> typing.IO:
269+
async def read(self, path: str) -> Optional[BytesIO]:
270+
"""
271+
Read a node with precedence personal → tenant → global.
272+
273+
Args:
274+
path: Logical library path starting with '/'.
275+
276+
Returns:
277+
BytesIO or None if not found in any scope.
278+
279+
Raises:
280+
RuntimeError: If ZooKeeper is not ready.
281+
"""
268282
if self.Zookeeper is None:
269283
L.warning("Zookeeper Client has not been established (yet). Cannot read {}".format(path))
270284
raise RuntimeError("Zookeeper Client has not been established (yet). Not ready.")
271285

272286
try:
273-
# Try tenant-specific path first
274-
node_path = self.build_path(path, tenant_specific=True)
275-
node_data = await self.Zookeeper.get_data(node_path)
276-
277-
# If not found, try the normal path
278-
if node_data is None:
279-
node_path = self.build_path(path, tenant_specific=False)
287+
for target in ("personal", "tenant", "global"):
288+
node_path = self.build_path(path, target=target)
280289
node_data = await self.Zookeeper.get_data(node_path)
281-
282-
if node_data is not None:
283-
return io.BytesIO(initial_bytes=node_data)
284-
else:
285-
return None
290+
if node_data is not None:
291+
return io.BytesIO(initial_bytes=node_data)
292+
return None
286293

287294
except kazoo.exceptions.ConnectionClosedError:
288295
L.warning("Zookeeper library provider is not ready")
289296
raise RuntimeError("Zookeeper library provider is not ready")
290297

291298
async def list(self, path: str) -> list:
299+
"""
300+
List nodes under `path` across all scopes in precedence order:
301+
personal, then tenant, then global. Results are concatenated
302+
(no dedup), matching current behavior while adding 'personal'.
303+
304+
Args:
305+
path: Directory path starting with '/'.
306+
307+
Returns:
308+
List[LibraryItem]
309+
"""
292310
if self.Zookeeper is None:
293311
L.warning("Zookeeper Client has not been established (yet). Cannot list {}".format(path))
294312
raise RuntimeError("Zookeeper Client has not been established (yet). Not ready.")
295313

296314
items = []
297315

298-
# Personal nodes
316+
# Personal scope
299317
personal_node_path = self.build_path(path, target="personal")
300318
personal_nodes = await self.Zookeeper.get_children(personal_node_path) or []
301319
items += await self.process_nodes(personal_nodes, path, target="personal")
302320

303-
# Tenant nodes
321+
# Tenant scope
304322
tenant_node_path = self.build_path(path, target="tenant")
305323
tenant_nodes = await self.Zookeeper.get_children(tenant_node_path) or []
306324
items += await self.process_nodes(tenant_nodes, path, target="tenant")
307325

308-
# Global nodes
326+
# Global scope
309327
global_node_path = self.build_path(path, target="global")
310328
global_nodes = await self.Zookeeper.get_children(global_node_path) or []
311329
items += await self.process_nodes(global_nodes, path, target="global")
312330

313331
return items
314332

315-
async def process_nodes(self, nodes, base_path, target="global"):
333+
async def process_nodes(self, nodes: list, base_path: str, target: str = "global") -> list:
316334
"""
317-
Processes a list of nodes and creates corresponding LibraryItem objects with their size.
335+
Convert child node names under `base_path` to LibraryItem objects.
318336
319337
Args:
320-
nodes (list): List of node names to process.
321-
base_path (str): The base path for the nodes.
322-
target (str): Specifies the target context, e.g., "tenant", "global", or "personal".
338+
nodes: Children names returned by ZooKeeper.
339+
base_path: Logical library base path (starts with '/').
340+
target: 'personal' | 'tenant' | 'global'.
323341
324342
Returns:
325-
list: A list of LibraryItem objects with size information.
343+
List[LibraryItem] with size set for files.
326344
"""
327345
items = []
328346
for node in nodes:
329-
# Remove any component that starts with '.'
330347
startswithdot = functools.reduce(lambda x, y: x or y.startswith('.'), node.split(os.path.sep), False)
331348
if startswithdot:
332349
continue
333350

334-
# Determine if this is a file or directory
335-
if '.' in node and not node.endswith(('.io', '.d')): # File check
351+
if '.' in node and not node.endswith(('.io', '.d')):
336352
fname = "{}/{}".format(base_path.rstrip("/"), node)
337353
ftype = "item"
338-
339354
try:
340355
node_path = self.build_path(fname, target=target)
341356
zstat = self.Zookeeper.Client.exists(node_path)
@@ -345,12 +360,11 @@ async def process_nodes(self, nodes, base_path, target="global"):
345360
except Exception as e:
346361
L.warning("Failed to retrieve size for node {}: {}".format(node_path, e))
347362
size = None
348-
else: # Directory check
363+
else:
349364
fname = "{}/{}/".format(base_path.rstrip("/"), node)
350365
ftype = "dir"
351366
size = None
352367

353-
# 👇 Add your block here
354368
if self.Layer == 0:
355369
if target == "global":
356370
layer_label = "0:global"
@@ -361,28 +375,40 @@ async def process_nodes(self, nodes, base_path, target="global"):
361375
else:
362376
layer_label = self.Layer
363377

364-
# Build LibraryItem
365378
items.append(LibraryItem(
366379
name=fname,
367380
type=ftype,
368381
layers=[layer_label],
369382
providers=[self],
370383
size=size
371384
))
372-
373385
return items
374386

375387

376-
def build_path(self, path, target="global"):
388+
def build_path(self, path: str, target: str = "global") -> str:
389+
"""
390+
Build an absolute ZooKeeper node path for the given logical library `path`
391+
within the specified target scope ('personal' | 'tenant' | 'global').
392+
393+
Args:
394+
path: The logical library path starting with '/'.
395+
target: Scope selector. Defaults to 'global'.
396+
397+
Returns:
398+
The fully qualified ZooKeeper node path.
399+
400+
Raises:
401+
AssertionError: If the resulting node path is malformed.
402+
"""
377403
assert path[:1] == '/'
378404
if path != '/':
379405
node_path = "{}{}".format(self.BasePath, path)
380406
else:
381407
node_path = "{}".format(self.BasePath)
382408

383409
if target == "personal":
410+
# /.personal/{CredentialsId}/...
384411
try:
385-
from ...contextvars import Authz
386412
authz = Authz.get()
387413
cred_id = getattr(authz, "CredentialsId", None)
388414
except LookupError:
@@ -391,40 +417,57 @@ def build_path(self, path, target="global"):
391417
node_path = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
392418

393419
elif target == "tenant":
420+
# /.tenants/{tenant}/...
394421
try:
395422
tenant = Tenant.get()
396423
except LookupError:
397424
tenant = None
398425
if tenant:
399426
node_path = "{}/.tenants/{}{}".format(self.BasePath, tenant, path)
400427

428+
# global target keeps node_path as is
429+
401430
node_path = node_path.rstrip("/")
402431

403432
assert '//' not in node_path, "Directory path cannot contain double slashes (//). Example format: /library/Templates/"
404433
assert node_path[0] == '/', "Directory path must start with a forward slash (/). For example: /library/Templates/"
405434

406435
return node_path
407436

408-
async def subscribe(self, path, target: typing.Union[str, tuple, None] = None):
437+
async def subscribe(self, path: str, target: typing.Union[str, tuple, None] = None):
438+
"""
439+
Subscribe to changes under `path` for the given target:
440+
- None / 'global' : watch global path
441+
- 'tenant' : watch all tenants
442+
- ('tenant', id) : watch a specific tenant
443+
- 'personal' : watch all personal credentials
444+
- ('personal', id): watch a specific credentials id
445+
"""
409446
self.Subscriptions.add((target, path))
410447

411-
if target is None:
412-
# Back-compat (pubsub callback must be called without `target` argument)
413-
# Watch path globally
414-
self.NodeDigests[path] = await self._get_directory_hash(path)
415-
elif target == "global":
416-
# Watch path globally
448+
if target is None or target == "global":
417449
self.NodeDigests[path] = await self._get_directory_hash(path)
450+
418451
elif target == "tenant":
419-
# Watch path in all tenants
420452
for tenant in await self._get_tenants():
421453
actual_path = "/.tenants/{}{}".format(tenant, path)
422454
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
455+
423456
elif isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant":
424-
# Watch path in a specific tenant
425457
_, tenant = target
426458
actual_path = "/.tenants/{}{}".format(tenant, path)
427459
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
460+
461+
elif target == "personal":
462+
for cred_id in await self._get_personals():
463+
actual_path = "/.personal/{}{}".format(cred_id, path)
464+
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
465+
466+
elif isinstance(target, tuple) and len(target) == 2 and target[0] == "personal":
467+
_, cred_id = target
468+
actual_path = "/.personal/{}{}".format(cred_id, path)
469+
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
470+
428471
else:
429472
raise ValueError("Unexpected target: {!r}".format(target))
430473

@@ -450,18 +493,17 @@ def recursive_traversal(path, digest):
450493
await self.Zookeeper.ProactorService.execute(recursive_traversal, path, digest)
451494
return digest.digest()
452495

453-
454496
async def _on_library_changed(self, event_name=None):
455497
"""
456-
Check watched paths and publish a pubsub message for every one that has changed.
498+
Recompute hashes for subscribed paths and publish "Library.change!" when changed.
499+
Supports global, tenant, and personal scopes.
457500
"""
458501
for (target, path) in list(self.Subscriptions):
459502

460-
async def do_check_path(actual_path):
503+
async def do_check_path(actual_path: str):
461504
try:
462505
newdigest = await self._get_directory_hash(actual_path)
463506
except kazoo.exceptions.NoNodeError:
464-
# This node is either deleted or has never existed.
465507
newdigest = None
466508

467509
if newdigest != self.NodeDigests.get(actual_path):
@@ -472,10 +514,7 @@ async def do_check_path(actual_path):
472514
try:
473515
await do_check_path(actual_path=path)
474516
except Exception as e:
475-
L.exception(
476-
"Failed to process library changes: '{}'".format(e),
477-
struct_data={"path": path},
478-
)
517+
L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path})
479518

480519
elif target == "tenant":
481520
for tenant in await self._get_tenants():
@@ -496,9 +535,43 @@ async def do_check_path(actual_path):
496535
"Failed to process library changes: '{}'".format(e),
497536
struct_data={"path": path, "tenant": tenant},
498537
)
538+
539+
elif target == "personal":
540+
for cred_id in await self._get_personals():
541+
try:
542+
await do_check_path(actual_path="/.personal/{}{}".format(cred_id, path))
543+
except Exception as e:
544+
L.exception(
545+
"Failed to process library changes: '{}'".format(e),
546+
struct_data={"path": path, "cred_id": cred_id},
547+
)
548+
549+
elif isinstance(target, tuple) and len(target) == 2 and target[0] == "personal":
550+
cred_id = target[1]
551+
try:
552+
await do_check_path(actual_path="/.personal/{}{}".format(cred_id, path))
553+
except Exception as e:
554+
L.exception(
555+
"Failed to process library changes: '{}'".format(e),
556+
struct_data={"path": path, "cred_id": cred_id},
557+
)
558+
499559
else:
500560
raise ValueError("Unexpected target: {!r}".format((target, path)))
501561

562+
async def _get_personals(self) -> typing.List[str]:
563+
"""
564+
List CredentialsIds that have custom content (i.e., directories) under /.personal.
565+
"""
566+
try:
567+
cred_ids = [
568+
c for c in await self.Zookeeper.get_children("{}/.personal".format(self.BasePath)) or []
569+
if not c.startswith(".")
570+
]
571+
except kazoo.exceptions.NoNodeError:
572+
cred_ids = []
573+
return cred_ids
574+
502575

503576
async def _get_tenants(self) -> typing.List[str]:
504577
"""
@@ -514,7 +587,6 @@ async def _get_tenants(self) -> typing.List[str]:
514587
return tenants
515588

516589

517-
518590
async def find(self, filename: str) -> list:
519591
"""
520592
Recursively search for files ending with a specific name in ZooKeeper nodes, starting from the base path.

0 commit comments

Comments
 (0)