Skip to content

[Feature] More efficient cache directory structure #681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions executorlib/standalone/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ def get_cache_data(cache_directory: str) -> list[dict]:

file_lst = []
for task_key in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
if os.path.exists(file_name):
file_name = os.path.join(cache_directory, task_key)
if task_key[-5:] == "_o.h5":
with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
Expand Down
10 changes: 5 additions & 5 deletions executorlib/task_scheduler/file/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
None

"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
file_name_out = os.path.splitext(file_name)[0][:-2]
os.rename(file_name, file_name_out + "_r.h5")
if "result" in output:
dump(
file_name=file_name_out + ".h5ready",
file_name=file_name_out + "_r.h5",
data_dict={"output": output["result"], "runtime": runtime},
)
else:
dump(
file_name=file_name_out + ".h5ready",
file_name=file_name_out + "_r.h5",
data_dict={"error": output["error"], "runtime": runtime},
)
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
os.rename(file_name_out + "_r.h5", file_name_out + "_o.h5")


def backend_execute_task_in_file(file_name: str) -> None:
Expand Down
16 changes: 6 additions & 10 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,9 @@ def execute_tasks_h5(
resource_dict=task_resource_dict,
)
if task_key not in memory_dict:
if not (
task_key in os.listdir(cache_directory)
and "cache.h5out"
in os.listdir(os.path.join(cache_directory, task_key))
):
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
if task_key + "_o.h5" not in os.listdir(cache_directory):
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_i.h5")
dump(file_name=file_name, data_dict=data_dict)
Comment on lines +111 to 114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exists() instead of scanning the directory for every task

Similar to the interactive path, scanning os.listdir(cache_directory) for each task does an unnecessary O(n) walk and risks TOCTOU issues.

-                if task_key + "_o.h5" not in os.listdir(cache_directory):
+                output_path = os.path.join(cache_directory, f"{task_key}_o.h5")
+                if not os.path.exists(output_path):

You already compute the same path a few lines later; reuse it to keep the logic DRY.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if task_key + "_o.h5" not in os.listdir(cache_directory):
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_i.h5")
dump(file_name=file_name, data_dict=data_dict)
# check for the output file directly rather than listing the whole directory
output_path = os.path.join(cache_directory, f"{task_key}_o.h5")
if not os.path.exists(output_path):
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, f"{task_key}_i.h5")
dump(file_name=file_name, data_dict=data_dict)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/shared.py around lines 111 to 114, replace
the check that scans os.listdir(cache_directory) with a direct file existence
check using os.path.exists() for the target file to avoid inefficient directory
scans and TOCTOU issues. Also, reuse the computed file path variable instead of
reconstructing it to keep the code DRY.

if not disable_dependencies:
task_dependent_lst = [
Expand All @@ -138,10 +134,10 @@ def execute_tasks_h5(
resource_dict=task_resource_dict,
config_directory=pysqa_config_directory,
backend=backend,
cache_directory=os.path.join(cache_directory, task_key),
cache_directory=cache_directory,
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key, "cache.h5out"
cache_directory, task_key + "_o.h5"
)
memory_dict[task_key] = task_dict["future"]
future_queue.task_done()
Expand Down Expand Up @@ -197,7 +193,7 @@ def _check_task_output(
Future: The updated future object.

"""
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
file_name = os.path.join(cache_directory, task_key + "_o.h5")
if not os.path.exists(file_name):
return future_obj
exec_flag, no_error_flag, result = get_output(file_name=file_name)
Expand Down
9 changes: 3 additions & 6 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,9 @@ def _execute_task_with_cache(
fn_kwargs=task_dict["kwargs"],
resource_dict=task_dict.get("resource_dict", {}),
)
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
if not (
task_key in os.listdir(cache_directory)
and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))
):
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_o.h5")
if task_key + "_o.h5" not in os.listdir(cache_directory):
Comment on lines +154 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prefer os.path.exists() over directory scans for existence checks

Using task_key + "_o.h5" not in os.listdir(cache_directory) forces a full directory listing on every call.
os.path.exists(file_name) (or Path(file_name).exists()) is:

  1. O(1) rather than O(n) on large cache dirs.
  2. Immune to race conditions where a file is created between listdir() and the subsequent logic.
  3. Clearer to read.
-    file_name = os.path.join(cache_directory, task_key + "_o.h5")
-    if task_key + "_o.h5" not in os.listdir(cache_directory):
+    file_name = os.path.join(cache_directory, f"{task_key}_o.h5")
+    if not os.path.exists(file_name):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_o.h5")
if task_key + "_o.h5" not in os.listdir(cache_directory):
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, f"{task_key}_o.h5")
if not os.path.exists(file_name):
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 154 to 156,
replace the check using 'task_key + "_o.h5" not in os.listdir(cache_directory)'
with 'not os.path.exists(file_name)'. This avoids scanning the entire directory,
improves performance, prevents race conditions, and makes the code clearer by
directly checking the file's existence.

f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
Expand Down
30 changes: 15 additions & 15 deletions tests/test_cache_backend_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def test_execute_function_mixed(self):
fn_args=[1],
fn_kwargs={"b": 2},
)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_i.h5")
os.makedirs(cache_directory, exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -46,11 +46,11 @@ def test_execute_function_mixed(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
file_name=os.path.join(cache_directory, task_key + "_o.h5")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand All @@ -63,7 +63,7 @@ def test_execute_function_args(self):
fn_args=[1, 2],
fn_kwargs={},
)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
file_name = os.path.join(cache_directory, task_key + "_i.h5")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
Expand All @@ -74,11 +74,11 @@ def test_execute_function_args(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
file_name=os.path.join(cache_directory, task_key + "_o.h5")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand All @@ -91,8 +91,8 @@ def test_execute_function_kwargs(self):
fn_args=[],
fn_kwargs={"a": 1, "b": 2},
)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_i.h5")
os.makedirs(cache_directory, exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -102,11 +102,11 @@ def test_execute_function_kwargs(self):
self.assertTrue(future_obj.done())
self.assertEqual(future_obj.result(), 3)
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
file_name=os.path.join(cache_directory, task_key + "_o.h5")
)
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)
Expand All @@ -119,8 +119,8 @@ def test_execute_function_error(self):
fn_args=[],
fn_kwargs={"a": 1},
)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
file_name = os.path.join(cache_directory, task_key + "_i.h5")
os.makedirs(cache_directory, exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
Expand All @@ -131,11 +131,11 @@ def test_execute_function_error(self):
with self.assertRaises(ValueError):
future_obj.result()
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
get_runtime(file_name=os.path.join(cache_directory, task_key + "_o.h5"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
file_name=os.path.join(cache_directory, task_key + "_o.h5")
)
self.assertTrue(future_file_obj.done())
with self.assertRaises(ValueError):
Expand Down
Loading