-
Notifications
You must be signed in to change notification settings - Fork 7
New 'personal' targets #712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c0fd2a1
6980627
2f5ab56
b253850
87793c2
334911e
7d516c9
a7f6548
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -12,7 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
from .abc import LibraryProviderABC | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from ..item import LibraryItem | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from ...zookeeper import ZooKeeperContainer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from ...contextvars import Tenant | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from ...contextvars import Tenant, Authz | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -241,8 +241,6 @@ async def _get_version_counter(self, event_name=None): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._check_version_counter(version) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _check_version_counter(self, version): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# If version is `None` aka `/.version.yaml` doesn't exists, then assume version -1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if version is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -264,25 +262,88 @@ def _check_version_counter(self, version): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.App.TaskService.schedule(self._on_library_changed()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def read(self, path: str) -> typing.IO: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# inside class ZooKeeperLibraryProvider | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _current_tenant_id(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return Tenant.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except LookupError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _current_credentials_id(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Return current CredentialsId from auth context, or None if not present. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
authz = Authz.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return getattr(authz, "CredentialsId", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except LookupError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _personal_node_path(self, path: str, cred_id: typing.Optional[str] = None) -> typing.Optional[str]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Resolve the absolute znode path for the personal target. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
path (str): Logical library path (must start with '/'). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id (str | None): CredentialsId; if None, it is looked up. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
str | None: '/<base>/.personal/<CredentialsId><path>' or None if not available. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert path[:1] == '/' | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = self._current_credentials_id() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if not cred_id: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
base = "{}/.personal/{}{}".format(self.BasePath, cred_id, path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return base.rstrip("/") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def read(self, path: str) -> typing.Optional[typing.IO]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Read a node from ZooKeeper honoring target precedence: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1) personal: <BasePath>/.personal/{CredentialsId}{path} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
2) tenant: <BasePath>/.tenants/{Tenant}{path} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
3) global: <BasePath>{path} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- "personal" is resolved only if a CredentialsId is available (eg. from Authz). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- "tenant" is resolved only if a tenant is present in contextvars. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- If none of the targets contain the node, returns None. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.Zookeeper is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
L.warning("Zookeeper Client has not been established (yet). Cannot read {}".format(path)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise RuntimeError("Zookeeper Client has not been established (yet). Not ready.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Build candidates in precedence order | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
candidate_paths: typing.List[str] = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# personal | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Try tenant-specific path first | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_path = self.build_path(path, tenant_specific=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_data = await self.Zookeeper.get_data(node_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = self._current_credentials_id() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
personal_candidate = self._personal_node_path(path, cred_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if personal_candidate: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
candidate_paths.append(personal_candidate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# tenant (uses build_path tenant_specific=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tenant_candidate = self.build_path(path, tenant_specific=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if tenant_candidate not in candidate_paths: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
candidate_paths.append(tenant_candidate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# global | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_candidate = self.build_path(path, tenant_specific=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if global_candidate not in candidate_paths: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
candidate_paths.append(global_candidate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# If not found, try the normal path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if node_data is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_path = self.build_path(path, tenant_specific=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for node_path in candidate_paths: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_data = await self.Zookeeper.get_data(node_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if node_data is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return io.BytesIO(initial_bytes=node_data) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if node_data is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return io.BytesIO(initial_bytes=node_data) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except kazoo.exceptions.ConnectionClosedError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
L.warning("Zookeeper library provider is not ready") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -296,7 +357,7 @@ async def list(self, path: str) -> list: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Process global nodes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_node_path = self.build_path(path, tenant_specific=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_nodes = await self.Zookeeper.get_children(global_node_path) or [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_items = await self.process_nodes(global_nodes, path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_items = await self.process_nodes(global_nodes, path, target="global") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Process tenant-specific nodes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tenant_node_path = self.build_path(path, tenant_specific=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -306,8 +367,16 @@ async def list(self, path: str) -> list: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tenant_items = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Instead of merging by item.name, simply concatenate the two lists. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return tenant_items + global_items | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Process personal nodes for current credentials (if any) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = self._current_credentials_id() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
personal_node_path = self._personal_node_path(path, cred_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if personal_node_path: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
personal_nodes = await self.Zookeeper.get_children(personal_node_path) or [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
personal_items = await self.process_nodes(personal_nodes, path, target="personal") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
personal_items = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Legacy behavior was tenant + global; now extend with personal first (no merging by name). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return personal_items + tenant_items + global_items | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def process_nodes(self, nodes, base_path, target="global"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -317,7 +386,7 @@ async def process_nodes(self, nodes, base_path, target="global"): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodes (list): List of node names to process. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
base_path (str): The base path for the nodes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target (str): Specifies the target context, e.g., "tenant" or "global". | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target (str): Specifies the target context, e.g., "personal", "tenant", or "global". | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
list: A list of LibraryItem objects with size information. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -336,26 +405,35 @@ async def process_nodes(self, nodes, base_path, target="global"): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Retrieve node size for items only | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_path = self.build_path(fname, tenant_specific=(target == "tenant")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if target == "personal": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_path = self._personal_node_path(fname, self._current_credentials_id()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node_path = self.build_path(fname, tenant_specific=(target == "tenant")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zstat = self.Zookeeper.Client.exists(node_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
size = zstat.dataLength if zstat else 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except kazoo.exceptions.NoNodeError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
size = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
L.warning("Failed to retrieve size for node {}: {}".format(node_path, e)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
L.warning("Failed to retrieve size for node {}: {}".format(fname, e)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
size = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+408
to
418
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid blocking the loop when computing item size
- try:
- if target == "personal":
- node_path = self._personal_node_path(fname, self._current_credentials_id())
- else:
- node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
- zstat = self.Zookeeper.Client.exists(node_path)
- size = zstat.dataLength if zstat else 0
- except kazoo.exceptions.NoNodeError:
- size = None
- except Exception as e:
- L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
- size = None
+ try:
+ if target == "personal":
+ node_path = self._personal_node_path(fname, self._current_credentials_id())
+ else:
+ node_path = self.build_path(fname, tenant_specific=(target == "tenant"))
+ if node_path is None:
+ size = None
+ else:
+ zstat = await self.Zookeeper.exists(node_path)
+ size = zstat.dataLength if zstat else None
+ except kazoo.exceptions.NoNodeError:
+ size = None
+ except kazoo.exceptions.KazooException as e:
+ L.warning("Failed to retrieve size for node {}: {}".format(fname, e))
+ size = None 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.13.2)422-422: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: # Directory check | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fname = "{}/{}/".format(base_path.rstrip("/"), node) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ftype = "dir" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
size = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Assign correct layer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.Layer == 0: # Only apply this logic to layer `0` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = "0:global" if target == "global" else "0:tenant" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Assign correct layer label for top layer 0 (presentational only) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.Layer == 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if target == "global": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = "0:global" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif target == "tenant": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = "0:tenant" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif target == "personal": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = "0:personal" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = "0:{}".format(target) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = self.Layer # Keep normal numbering for other layers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
layer_label = self.Layer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Add the item with the specified target and size | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
items.append(LibraryItem( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
name=fname, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type=ftype, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -390,7 +468,6 @@ def build_path(self, path, tenant_specific=False): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return node_path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def subscribe(self, path, target: typing.Union[str, tuple, None] = None): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.Subscriptions.add((target, path)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -411,10 +488,15 @@ async def subscribe(self, path, target: typing.Union[str, tuple, None] = None): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
_, tenant = target | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
actual_path = "/.tenants/{}{}".format(tenant, path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif target == "personal": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Watch current user's personal path (if credentials id present) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = self._current_credentials_id() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if cred_id: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
actual_path = "/.personal/{}{}".format(cred_id, path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise ValueError("Unexpected target: {!r}".format(target)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _get_directory_hash(self, path): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
path = self.BasePath + path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -436,7 +518,6 @@ def recursive_traversal(path, digest): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
await self.Zookeeper.ProactorService.execute(recursive_traversal, path, digest) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return digest.digest() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _on_library_changed(self, event_name=None): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Check watched paths and publish a pubsub message for every one that has changed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -482,10 +563,35 @@ async def do_check_path(actual_path): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Failed to process library changes: '{}'".format(e), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
struct_data={"path": path, "tenant": tenant}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif target == "personal": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_id = self._current_credentials_id() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if cred_id: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await do_check_path(actual_path="/.personal/{}{}".format(cred_id, path)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
L.exception( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Failed to process library changes: '{}'".format(e), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
struct_data={"path": path, "credentials_id": cred_id}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise ValueError("Unexpected target: {!r}".format((target, path))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _get_personals(self) -> typing.List[str]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List CredentialsIds that have custom content (i.e., directories) under /.personal. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_ids = [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c for c in await self.Zookeeper.get_children("{}/.personal".format(self.BasePath)) or [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if not c.startswith(".") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except kazoo.exceptions.NoNodeError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cred_ids = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return cred_ids | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def _get_tenants(self) -> typing.List[str]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List tenants that have custom content in the library (in the /.tenants directory). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -500,7 +606,6 @@ async def _get_tenants(self) -> typing.List[str]: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
return tenants | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def find(self, filename: str) -> list: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Recursively search for files ending with a specific name in ZooKeeper nodes, starting from the base path. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix parameter shadowing bug in
_personal_node_path
.Line 295 overwrites the
cred_id
parameter by callingself._current_credentials_id()
, making the parameter completely ignored. Callers passingcred_id
will have their value discarded.Apply this diff:
📝 Committable suggestion
🤖 Prompt for AI Agents