Skip to content

Commit ff521c1

Browse files
Create cache directory (#601)
* Create cache directory * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * more fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * set job name based on file name * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent feb3fd6 commit ff521c1

File tree

5 files changed

+43
-27
lines changed

5 files changed

+43
-27
lines changed

executorlib/cache/queue_spawner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ def execute_with_pysqa(
6868
if k in resource_dict:
6969
del resource_dict[k]
7070
if "job_name" not in resource_dict:
71-
resource_dict["job_name"] = "pysqa"
71+
resource_dict["job_name"] = os.path.basename(
72+
os.path.dirname(os.path.abspath(cwd))
73+
)
7274
submit_kwargs.update(resource_dict)
7375
queue_id = qa.submit_job(**submit_kwargs)
7476
dump(file_name=file_name, data_dict={"queue_id": queue_id})

executorlib/cache/shared.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,13 @@ def execute_tasks_h5(
106106
resource_dict=task_resource_dict,
107107
)
108108
if task_key not in memory_dict:
109-
if task_key + ".h5out" not in os.listdir(cache_directory):
110-
file_name = os.path.join(cache_directory, task_key + ".h5in")
109+
if not (
110+
task_key in os.listdir(cache_directory)
111+
and "cache.h5out"
112+
in os.listdir(os.path.join(cache_directory, task_key))
113+
):
114+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
115+
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
111116
dump(file_name=file_name, data_dict=data_dict)
112117
if not disable_dependencies:
113118
task_dependent_lst = [
@@ -131,10 +136,10 @@ def execute_tasks_h5(
131136
resource_dict=task_resource_dict,
132137
config_directory=pysqa_config_directory,
133138
backend=backend,
134-
cache_directory=cache_directory,
139+
cache_directory=os.path.join(cache_directory, task_key),
135140
)
136141
file_name_dict[task_key] = os.path.join(
137-
cache_directory, task_key + ".h5out"
142+
cache_directory, task_key, "cache.h5out"
138143
)
139144
memory_dict[task_key] = task_dict["future"]
140145
future_queue.task_done()
@@ -190,7 +195,7 @@ def _check_task_output(
190195
Future: The updated future object.
191196
192197
"""
193-
file_name = os.path.join(cache_directory, task_key + ".h5out")
198+
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
194199
if not os.path.exists(file_name):
195200
return future_obj
196201
exec_flag, result = get_output(file_name=file_name)

executorlib/interactive/shared.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,12 @@ def _execute_task_with_cache(
148148
fn_kwargs=task_dict["kwargs"],
149149
resource_dict=task_dict.get("resource_dict", {}),
150150
)
151-
os.makedirs(cache_directory, exist_ok=True)
152-
file_name = os.path.join(cache_directory, task_key + ".h5out")
153-
if task_key + ".h5out" not in os.listdir(cache_directory):
151+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
152+
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
153+
if not (
154+
task_key in os.listdir(cache_directory)
155+
and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))
156+
):
154157
f = task_dict.pop("future")
155158
if f.set_running_or_notify_cancel():
156159
try:

executorlib/standalone/hdf.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,16 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]:
104104

105105
def get_cache_data(cache_directory: str) -> list[dict]:
106106
file_lst = []
107-
for file_name in os.listdir(cache_directory):
108-
with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf:
109-
file_content_dict = {
110-
key: cloudpickle.loads(np.void(hdf["/" + key]))
111-
for key in group_dict.values()
112-
if key in hdf
113-
}
114-
file_content_dict["filename"] = file_name
115-
file_lst.append(file_content_dict)
107+
for task_key in os.listdir(cache_directory):
108+
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
109+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
110+
if os.path.exists(file_name):
111+
with h5py.File(file_name, "r") as hdf:
112+
file_content_dict = {
113+
key: cloudpickle.loads(np.void(hdf["/" + key]))
114+
for key in group_dict.values()
115+
if key in hdf
116+
}
117+
file_content_dict["filename"] = file_name
118+
file_lst.append(file_content_dict)
116119
return file_lst

tests/test_cache_shared.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def test_execute_function_mixed(self):
3131
fn_args=[1],
3232
fn_kwargs={"b": 2},
3333
)
34-
file_name = os.path.join(cache_directory, task_key + ".h5in")
34+
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
35+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
3536
dump(file_name=file_name, data_dict=data_dict)
3637
backend_execute_task_in_file(file_name=file_name)
3738
future_obj = Future()
@@ -41,11 +42,11 @@ def test_execute_function_mixed(self):
4142
self.assertTrue(future_obj.done())
4243
self.assertEqual(future_obj.result(), 3)
4344
self.assertTrue(
44-
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
45+
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
4546
> 0.0
4647
)
4748
future_file_obj = FutureItem(
48-
file_name=os.path.join(cache_directory, task_key + ".h5out")
49+
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
4950
)
5051
self.assertTrue(future_file_obj.done())
5152
self.assertEqual(future_file_obj.result(), 3)
@@ -58,7 +59,8 @@ def test_execute_function_args(self):
5859
fn_args=[1, 2],
5960
fn_kwargs={},
6061
)
61-
file_name = os.path.join(cache_directory, task_key + ".h5in")
62+
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
63+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
6264
dump(file_name=file_name, data_dict=data_dict)
6365
backend_execute_task_in_file(file_name=file_name)
6466
future_obj = Future()
@@ -68,11 +70,11 @@ def test_execute_function_args(self):
6870
self.assertTrue(future_obj.done())
6971
self.assertEqual(future_obj.result(), 3)
7072
self.assertTrue(
71-
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
73+
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
7274
> 0.0
7375
)
7476
future_file_obj = FutureItem(
75-
file_name=os.path.join(cache_directory, task_key + ".h5out")
77+
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
7678
)
7779
self.assertTrue(future_file_obj.done())
7880
self.assertEqual(future_file_obj.result(), 3)
@@ -85,7 +87,8 @@ def test_execute_function_kwargs(self):
8587
fn_args=[],
8688
fn_kwargs={"a": 1, "b": 2},
8789
)
88-
file_name = os.path.join(cache_directory, task_key + ".h5in")
90+
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
91+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
8992
dump(file_name=file_name, data_dict=data_dict)
9093
backend_execute_task_in_file(file_name=file_name)
9194
future_obj = Future()
@@ -95,11 +98,11 @@ def test_execute_function_kwargs(self):
9598
self.assertTrue(future_obj.done())
9699
self.assertEqual(future_obj.result(), 3)
97100
self.assertTrue(
98-
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
101+
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
99102
> 0.0
100103
)
101104
future_file_obj = FutureItem(
102-
file_name=os.path.join(cache_directory, task_key + ".h5out")
105+
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
103106
)
104107
self.assertTrue(future_file_obj.done())
105108
self.assertEqual(future_file_obj.result(), 3)

0 commit comments

Comments
 (0)