Skip to content

Clean up: warn when functionality is not available #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@
SlurmClusterExecutor,
SlurmJobExecutor,
)
from executorlib.standalone.cache import get_cache_data

__all__: list[str] = [
"get_cache_data",
"FluxJobExecutor",
"FluxClusterExecutor",
"SingleNodeExecutor",
"SlurmJobExecutor",
"SlurmClusterExecutor",
]

try:
from executorlib.standalone.hdf import get_cache_data
except ImportError:
pass
else:
__all__ += ["get_cache_data"]

__version__ = _get_versions()["version"]
12 changes: 5 additions & 7 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import contextlib
from typing import Callable, Optional, Union

from executorlib.executor.base import ExecutorBase
Expand All @@ -17,12 +16,6 @@
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler

with contextlib.suppress(ImportError):
from executorlib.task_scheduler.interactive.fluxspawner import (
FluxPythonSpawner,
validate_max_workers,
)


class FluxJobExecutor(ExecutorBase):
"""
Expand Down Expand Up @@ -440,6 +433,11 @@ def create_flux_executor(
Returns:
InteractiveStepExecutor/ InteractiveExecutor
"""
from executorlib.task_scheduler.interactive.fluxspawner import (
FluxPythonSpawner,
validate_max_workers,
)

if resource_dict is None:
resource_dict = {}
cores_per_worker = resource_dict.get("cores", 1)
Expand Down
42 changes: 42 additions & 0 deletions executorlib/standalone/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os

import cloudpickle
import numpy as np

group_dict = {
"fn": "function",
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"error": "error",
"runtime": "runtime",
"queue_id": "queue_id",
}


def get_cache_data(cache_directory: str) -> list[dict]:
"""
Collect all HDF5 files in the cache directory

Args:
cache_directory (str): The directory to store cache files.

Returns:
list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory.
"""
import h5py

file_lst = []
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
return file_lst
Comment on lines +29 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Missing error handling for file operations.

The function doesn't include error handling for scenarios like corrupted HDF5 files, permission issues, or other I/O errors that might occur when reading files.

Consider adding try-except blocks to handle potential exceptions:

 for task_key in os.listdir(cache_directory):
     file_name = os.path.join(cache_directory, task_key, "cache.h5out")
     os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
     if os.path.exists(file_name):
-        with h5py.File(file_name, "r") as hdf:
-            file_content_dict = {
-                key: cloudpickle.loads(np.void(hdf["/" + key]))
-                for key in group_dict.values()
-                if key in hdf
-            }
-        file_content_dict["filename"] = file_name
-        file_lst.append(file_content_dict)
+        try:
+            with h5py.File(file_name, "r") as hdf:
+                file_content_dict = {
+                    key: cloudpickle.loads(np.void(hdf["/" + key]))
+                    for key in group_dict.values()
+                    if key in hdf
+                }
+                file_content_dict["filename"] = file_name
+                file_lst.append(file_content_dict)
+        except (OSError, KeyError, ValueError) as e:
+            # Log or handle the error as appropriate
+            print(f"Error reading cache file {file_name}: {e}")
+            continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
file_lst = []
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
return file_lst
file_lst = []
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
try:
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
except (OSError, KeyError, ValueError) as e:
# Log or handle the error as appropriate
print(f"Error reading cache file {file_name}: {e}")
continue
return file_lst

2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import Any

from executorlib.standalone.hdf import dump, load
from executorlib.task_scheduler.file.hdf import dump, load
from executorlib.task_scheduler.file.shared import FutureItem


Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
import os
from typing import Any, Optional

import cloudpickle
import h5py
import numpy as np

group_dict = {
"fn": "function",
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"error": "error",
"runtime": "runtime",
"queue_id": "queue_id",
}
from executorlib.standalone.cache import group_dict


def dump(file_name: Optional[str], data_dict: dict) -> None:
Expand Down Expand Up @@ -98,25 +89,17 @@ def get_runtime(file_name: str) -> float:


def get_queue_id(file_name: Optional[str]) -> Optional[int]:
"""
Get queuing system id from HDF5 file

Args:
file_name (str): file name of the HDF5 file as absolute path

Returns:
int: queuing system id from the execution of the python function
"""
if file_name is not None:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
return None


def get_cache_data(cache_directory: str) -> list[dict]:
file_lst = []
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
return file_lst
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from pysqa import QueueAdapter

from executorlib.standalone.hdf import dump, get_queue_id
from executorlib.standalone.inputcheck import check_file_exists
from executorlib.task_scheduler.file.hdf import dump, get_queue_id


def execute_with_pysqa(
Expand Down
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import Any, Callable, Optional

from executorlib.standalone.command import get_command_path
from executorlib.standalone.hdf import dump, get_output
from executorlib.standalone.serialize import serialize_funct_h5
from executorlib.task_scheduler.file.hdf import dump, get_output


class FutureItem:
Expand Down
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _execute_task_with_cache(
future_queue (Queue): Queue for receiving new tasks.
cache_directory (str): The directory to store cache files.
"""
from executorlib.standalone.hdf import dump, get_output
from executorlib.task_scheduler.file.hdf import dump, get_output

task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_backend_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
try:
from executorlib.task_scheduler.file.backend import backend_execute_task_in_file
from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem
from executorlib.standalone.hdf import dump, get_runtime
from executorlib.task_scheduler.file.hdf import dump, get_runtime
from executorlib.standalone.serialize import serialize_funct_h5

skip_h5io_test = False
Expand Down
4 changes: 2 additions & 2 deletions tests/test_singlenodeexecutor_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import shutil
import unittest

from executorlib import SingleNodeExecutor
from executorlib import SingleNodeExecutor, get_cache_data
from executorlib.standalone.serialize import cloudpickle_register

try:
from executorlib import get_cache_data
import h5py

skip_h5py_test = False
except ImportError:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_standalone_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


try:
from executorlib.standalone.hdf import (
from executorlib.task_scheduler.file.hdf import (
dump,
load,
get_output,
Expand Down
Loading