Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 135 additions & 30 deletions asab/library/providers/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .abc import LibraryProviderABC
from ..item import LibraryItem
from ...zookeeper import ZooKeeperContainer
from ...contextvars import Tenant
from ...contextvars import Tenant, Authz

#

Expand Down Expand Up @@ -241,8 +241,6 @@ async def _get_version_counter(self, event_name=None):
self._check_version_counter(version)




def _check_version_counter(self, version):
# If version is `None` aka `/.version.yaml` doesn't exists, then assume version -1
if version is not None:
Expand All @@ -264,25 +262,88 @@ def _check_version_counter(self, version):

self.App.TaskService.schedule(self._on_library_changed())

async def read(self, path: str) -> typing.IO:
# inside class ZooKeeperLibraryProvider

def _current_tenant_id(self):
try:
return Tenant.get()
except LookupError:
return None

def _current_credentials_id(self):
"""
Return current CredentialsId from auth context, or None if not present.
"""
try:
authz = Authz.get()
return getattr(authz, "CredentialsId", None)
except LookupError:
return None

def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]:
"""
Resolve the absolute znode path for the personal target.

Args:
path (str): Logical library path (must start with '/').
cred_id (str | None): CredentialsId; if None, it is looked up.

Returns:
str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available.
"""
assert path[:1] == '/'
cred_id = self._current_credentials_id()
if not cred_id:
return None

base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
return base.rstrip("/")
Comment on lines +283 to +300
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix parameter shadowing bug in _personal_node_path.

Line 295 overwrites the cred_id parameter by calling self._current_credentials_id(), making the parameter completely ignored. Callers passing cred_id will have their value discarded.

Apply this diff:

-	def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]:
-		"""
-		Resolve the absolute znode path for the personal target.
-
-		Args:
-			path (str): Logical library path (must start with '/').
-			cred_id (str | None): CredentialsId; if None, it is looked up.
-
-		Returns:
-			str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available.
-		"""
-		assert path[:1] == '/'
-		cred_id = self._current_credentials_id()
-		if not cred_id:
-			return None
-
-		base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
-		return base.rstrip("/")
+	def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]:
+		"""
+		Resolve the absolute znode path for the personal target.
+
+		Args:
+			path (str): Logical library path (must start with '/').
+			cred_id (str | None): CredentialsId; if None, it is looked up.
+
+		Returns:
+			str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available.
+		"""
+		assert path[:1] == '/'
+		if cred_id is None:
+			cred_id = self._current_credentials_id()
+		if not cred_id:
+			return None
+
+		base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
+		return base.rstrip("/")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]:
"""
Resolve the absolute znode path for the personal target.
Args:
path (str): Logical library path (must start with '/').
cred_id (str | None): CredentialsId; if None, it is looked up.
Returns:
str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available.
"""
assert path[:1] == '/'
cred_id = self._current_credentials_id()
if not cred_id:
return None
base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
return base.rstrip("/")
def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]:
"""
Resolve the absolute znode path for the personal target.
Args:
path (str): Logical library path (must start with '/').
cred_id (str | None): CredentialsId; if None, it is looked up.
Returns:
str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available.
"""
assert path[:1] == '/'
if cred_id is None:
cred_id = self._current_credentials_id()
if not cred_id:
return None
base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path)
return base.rstrip("/")
🤖 Prompt for AI Agents
In asab/library/providers/zookeeper.py around lines 283 to 300, the method
_personal_node_path currently overwrites the cred_id parameter by
unconditionally calling self._current_credentials_id(), which discards any
caller-provided cred_id; change the logic to only call
self._current_credentials_id() when the incoming cred_id is None (i.e., use the
passed-in cred_id if truthy, otherwise set cred_id =
self._current_credentials_id()); keep the existing early return if cred_id is
still falsy, and retain building base =
"{}/.personal/{}{}".format(self.BasePath, cred_id, path) and returning
base.rstrip("/") .


async def read(self, path: str) -> typing.Optional[typing.IO]:
"""
Read a node from ZooKeeper honoring target precedence:

1) personal: <BasePath>/.personal/{CredentialsId}{path}
2) tenant: <BasePath>/.tenants/{Tenant}{path}
3) global: <BasePath>{path}

- "personal" is resolved only if a CredentialsId is available (eg. from Authz).
- "tenant" is resolved only if a tenant is present in contextvars.
- If none of the targets contain the node, returns None.
"""
if self.Zookeeper is None:
L.warning("Zookeeper Client has not been established (yet). Cannot read {}".format(path))
raise RuntimeError("Zookeeper Client has not been established (yet). Not ready.")

# Build candidates in precedence order
candidate_paths: typing.List[str] = []

# personal
cred_id = None
try:
# Try tenant-specific path first
node_path = self.build_path(path, tenant_specific=True)
node_data = await self.Zookeeper.get_data(node_path)
cred_id = self._current_credentials_id()
except Exception:
cred_id = None
personal_candidate = self._personal_node_path(path, cred_id)
if personal_candidate:
candidate_paths.append(personal_candidate)

# tenant (uses build_path tenant_specific=True)
tenant_candidate = self.build_path(path, tenant_specific=True)
if tenant_candidate not in candidate_paths:
candidate_paths.append(tenant_candidate)

# global
global_candidate = self.build_path(path, tenant_specific=False)
if global_candidate not in candidate_paths:
candidate_paths.append(global_candidate)

# If not found, try the normal path
if node_data is None:
node_path = self.build_path(path, tenant_specific=False)
try:
for node_path in candidate_paths:
node_data = await self.Zookeeper.get_data(node_path)

if node_data is not None:
return io.BytesIO(initial_bytes=node_data)
else:
return None
if node_data is not None:
return io.BytesIO(initial_bytes=node_data)
return None

except kazoo.exceptions.ConnectionClosedError:
L.warning("Zookeeper library provider is not ready")
Expand All @@ -296,7 +357,7 @@ async def list(self, path: str) -> list:
# Process global nodes
global_node_path = self.build_path(path, tenant_specific=False)
global_nodes = await self.Zookeeper.get_children(global_node_path) or []
global_items = await self.process_nodes(global_nodes, path)
global_items = await self.process_nodes(global_nodes, path, target="global")

# Process tenant-specific nodes
tenant_node_path = self.build_path(path, tenant_specific=True)
Expand All @@ -306,8 +367,16 @@ async def list(self, path: str) -> list:
else:
tenant_items = []

# Instead of merging by item.name, simply concatenate the two lists.
return tenant_items + global_items
# Process personal nodes for current credentials (if any)
cred_id = self._current_credentials_id()
personal_node_path = self._personal_node_path(path, cred_id)
if personal_node_path:
personal_nodes = await self.Zookeeper.get_children(personal_node_path) or []
personal_items = await self.process_nodes(personal_nodes, path, target="personal")
else:
personal_items = []
# Legacy behavior was tenant + global; now extend with personal first (no merging by name).
return personal_items + tenant_items + global_items


async def process_nodes(self, nodes, base_path, target="global"):
Expand All @@ -317,7 +386,7 @@ async def process_nodes(self, nodes, base_path, target="global"):
Args:
nodes (list): List of node names to process.
base_path (str): The base path for the nodes.
target (str): Specifies the target context, e.g., "tenant" or "global".
target (str): Specifies the target context, e.g., "personal", "tenant", or "global".

Returns:
list: A list of LibraryItem objects with size information.
Expand All @@ -336,26 +405,35 @@ async def process_nodes(self, nodes, base_path, target="global"):

# Retrieve node size for items only
try:
node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
if target == "personal":
node_path = self._personal_node_path(fname, self._current_credentials_id())
else:
node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
zstat = self.Zookeeper.Client.exists(node_path)
size = zstat.dataLength if zstat else 0
except kazoo.exceptions.NoNodeError:
size = None
except Exception as e:
L.warning("Failed to retrieve size for node {}: {}".format(node_path, e))
L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
size = None
Comment on lines +408 to 418
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid blocking the loop when computing item size

self.Zookeeper.Client.exists(...) runs synchronously, so every node blocks the event loop while hitting ZooKeeper. On top of that, a missing node is reported as size 0, which is indistinguishable from empty content. Switch to the async wrapper and surface “missing” as None.

-				try:
-					if target == "personal":
-						node_path = self._personal_node_path(fname, self._current_credentials_id())
-					else:
-						node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
-					zstat = self.Zookeeper.Client.exists(node_path)
-					size = zstat.dataLength if zstat else 0
-				except kazoo.exceptions.NoNodeError:
-					size = None
-				except Exception as e:
-					L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
-					size = None
+				try:
+					if target == "personal":
+						node_path = self._personal_node_path(fname, self._current_credentials_id())
+					else:
+						node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
+					if node_path is None:
+						size = None
+					else:
+						zstat = await self.Zookeeper.exists(node_path)
+						size = zstat.dataLength if zstat else None
+				except kazoo.exceptions.NoNodeError:
+					size = None
+				except kazoo.exceptions.KazooException as e:
+					L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
+					size = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if target == "personal":
node_path = self._personal_node_path(fname, self._current_credentials_id())
else:
node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
zstat = self.Zookeeper.Client.exists(node_path)
size = zstat.dataLength if zstat else 0
except kazoo.exceptions.NoNodeError:
size = None
except Exception as e:
L.warning("Failed to retrieve size for node {}: {}".format(node_path, e))
L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
size = None
try:
if target == "personal":
node_path = self._personal_node_path(fname, self._current_credentials_id())
else:
node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
if node_path is None:
size = None
else:
zstat = await self.Zookeeper.exists(node_path)
size = zstat.dataLength if zstat else None
except kazoo.exceptions.NoNodeError:
size = None
except kazoo.exceptions.KazooException as e:
L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
size = None
🧰 Tools
🪛 Ruff (0.13.2)

422-422: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In asab/library/providers/zookeeper.py around lines 414 to 424, the code uses
the synchronous self.Zookeeper.Client.exists(...) which blocks the event loop
and treats a missing node the same as an empty node (size 0); replace the sync
call with the async exists wrapper (await the async exists method on the
Zookeeper client), set size = None if the async exists returns no zstat or
raises kazoo.exceptions.NoNodeError, otherwise set size = zstat.dataLength, and
keep the existing generic Exception handling but ensure it does not call the
synchronous API so the loop is non-blocking.

else: # Directory check
fname = "{}/{}/".format(base_path.rstrip("/"), node)
ftype = "dir"
size = None

# Assign correct layer
if self.Layer == 0: # Only apply this logic to layer `0`
layer_label = "0:global" if target == "global" else "0:tenant"
# Assign correct layer label for top layer 0 (presentational only)
if self.Layer == 0:
if target == "global":
layer_label = "0:global"
elif target == "tenant":
layer_label = "0:tenant"
elif target == "personal":
layer_label = "0:personal"
else:
layer_label = "0:{}".format(target)
else:
layer_label = self.Layer # Keep normal numbering for other layers
layer_label = self.Layer

# Add the item with the specified target and size
items.append(LibraryItem(
name=fname,
type=ftype,
Expand Down Expand Up @@ -390,7 +468,6 @@ def build_path(self, path, tenant_specific=False):

return node_path


async def subscribe(self, path, target: typing.Union[str, tuple, None] = None):
self.Subscriptions.add((target, path))

Expand All @@ -411,10 +488,15 @@ async def subscribe(self, path, target: typing.Union[str, tuple, None] = None):
_, tenant = target
actual_path = "/.tenants/{}{}".format(tenant, path)
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
elif target == "personal":
# Watch current user's personal path (if credentials id present)
cred_id = self._current_credentials_id()
if cred_id:
actual_path = "/.personal/{}{}".format(cred_id, path)
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
else:
raise ValueError("Unexpected target: {!r}".format(target))


async def _get_directory_hash(self, path):
path = self.BasePath + path

Expand All @@ -436,7 +518,6 @@ def recursive_traversal(path, digest):
await self.Zookeeper.ProactorService.execute(recursive_traversal, path, digest)
return digest.digest()


async def _on_library_changed(self, event_name=None):
"""
Check watched paths and publish a pubsub message for every one that has changed.
Expand Down Expand Up @@ -482,10 +563,35 @@ async def do_check_path(actual_path):
"Failed to process library changes: '{}'".format(e),
struct_data={"path": path, "tenant": tenant},
)

elif target == "personal":
cred_id = self._current_credentials_id()
if cred_id:
try:
await do_check_path(actual_path="/.personal/{}{}".format(cred_id, path))
except Exception as e:
L.exception(
"Failed to process library changes: '{}'".format(e),
struct_data={"path": path, "credentials_id": cred_id},
)
else:
raise ValueError("Unexpected target: {!r}".format((target, path)))


async def _get_personals(self) -> typing.List[str]:
"""
List CredentialsIds that have custom content (i.e., directories) under /.personal.
"""
try:
cred_ids = [
c for c in await self.Zookeeper.get_children("{}/.personal".format(self.BasePath)) or []
if not c.startswith(".")
]
except kazoo.exceptions.NoNodeError:
cred_ids = []
return cred_ids


async def _get_tenants(self) -> typing.List[str]:
"""
List tenants that have custom content in the library (in the /.tenants directory).
Expand All @@ -500,7 +606,6 @@ async def _get_tenants(self) -> typing.List[str]:
return tenants



async def find(self, filename: str) -> list:
"""
Recursively search for files ending with a specific name in ZooKeeper nodes, starting from the base path.
Expand Down
6 changes: 4 additions & 2 deletions asab/library/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ async def open(self, path: str):

async def list(self, path: str = "/", recursive: bool = False) -> typing.List[LibraryItem]:
"""
List the directory of the library specified by the path that are enabled for the specified tenant.
This method can be used only after the Library is ready.
List the directory of the library specified by the path that are enabled for the
specified target (global, tenant, or personal).

**WARNING:** Tenant must be set in the context variable!
If it is not set automatically (e.g. from web request), it must be set manually.
Expand Down Expand Up @@ -715,6 +715,8 @@ async def subscribe(
- "global" to watch global path changes
- "tenant" to watch path changes in tenants
- ("tenant", TENANT_ID) to watch path changes in one specified tenant TENANT_ID
- "personal" to watch path changes for all personal credential IDs
- ("personal", CREDENTIALS_ID) to watch in one specific personal scope

Examples:
```python
Expand Down