Skip to content

Create cache directory #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion executorlib/cache/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def execute_with_pysqa(
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
resource_dict["job_name"] = os.path.basename(
os.path.dirname(os.path.abspath(cwd))
)
submit_kwargs.update(resource_dict)
queue_id = qa.submit_job(**submit_kwargs)
dump(file_name=file_name, data_dict={"queue_id": queue_id})
Expand Down
15 changes: 10 additions & 5 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ def execute_tasks_h5(
resource_dict=task_resource_dict,
)
if task_key not in memory_dict:
if task_key + ".h5out" not in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key + ".h5in")
if not (
task_key in os.listdir(cache_directory)
and "cache.h5out"
in os.listdir(os.path.join(cache_directory, task_key))
):
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
Comment on lines +114 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

The file structure change is inconsistent with _check_task_output function

The code changes introduce a new directory structure for caching, where each task has its own subdirectory. While these lines correctly implement the new structure, the _check_task_output function (line 198) still looks for output files in the old location.

You need to update the _check_task_output function to work with the new directory structure:

def _check_task_output(
    task_key: str, future_obj: Future, cache_directory: str
) -> Future:
    """
    Check the output of a task and set the result of the future object if available.

    Args:
        task_key (str): The key of the task.
        future_obj (Future): The future object associated with the task.
        cache_directory (str): The directory where the HDF5 files are stored.

    Returns:
        Future: The updated future object.

    """
-    file_name = os.path.join(cache_directory, task_key + ".h5out")
+    file_name = os.path.join(cache_directory, task_key, "cache.h5out")
    if not os.path.exists(file_name):
        return future_obj
    exec_flag, result = get_output(file_name=file_name)
    if exec_flag:
        future_obj.set_result(result)
    return future_obj

Also applies to: 139-139, 142-142

dump(file_name=file_name, data_dict=data_dict)
if not disable_dependencies:
task_dependent_lst = [
Expand All @@ -131,10 +136,10 @@ def execute_tasks_h5(
resource_dict=task_resource_dict,
config_directory=pysqa_config_directory,
backend=backend,
cache_directory=cache_directory,
cache_directory=os.path.join(cache_directory, task_key),
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key + ".h5out"
cache_directory, task_key, "cache.h5out"
)
memory_dict[task_key] = task_dict["future"]
future_queue.task_done()
Expand Down Expand Up @@ -190,7 +195,7 @@ def _check_task_output(
Future: The updated future object.

"""
file_name = os.path.join(cache_directory, task_key + ".h5out")
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
if not os.path.exists(file_name):
return future_obj
exec_flag, result = get_output(file_name=file_name)
Expand Down
9 changes: 6 additions & 3 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ def _execute_task_with_cache(
fn_kwargs=task_dict["kwargs"],
resource_dict=task_dict.get("resource_dict", {}),
)
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + ".h5out")
if task_key + ".h5out" not in os.listdir(cache_directory):
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
if not (
task_key in os.listdir(cache_directory)
and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))
):
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
Expand Down
21 changes: 12 additions & 9 deletions executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]:

def get_cache_data(cache_directory: str) -> list[dict]:
file_lst = []
for file_name in os.listdir(cache_directory):
with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
return file_lst
21 changes: 12 additions & 9 deletions tests/test_cache_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def test_execute_function_mixed(self):
fn_args=[1],
fn_kwargs={"b": 2},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -41,11 +42,11 @@ def test_execute_function_mixed(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key + ".h5out")
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand All @@ -58,7 +59,8 @@ def test_execute_function_args(self):
fn_args=[1, 2],
fn_kwargs={},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -68,11 +70,11 @@ def test_execute_function_args(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key + ".h5out")
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand All @@ -85,7 +87,8 @@ def test_execute_function_kwargs(self):
fn_args=[],
fn_kwargs={"a": 1, "b": 2},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -95,11 +98,11 @@ def test_execute_function_kwargs(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key + ".h5out")
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand Down
Loading