Skip to content

Commit ea305da

Browse files
Measure time for execution and store it in the HDF5 files (#524)
* Measure time for execution and store it in the HDF5 files * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * store runtime in hdf5 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * runtime not set * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent c3a0ae7 commit ea305da

File tree

6 files changed

+57
-4
lines changed

6 files changed

+57
-4
lines changed

executorlib/backend/cache_parallel.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pickle
22
import sys
3+
import time
34

45
import cloudpickle
56

@@ -32,6 +33,7 @@ def main() -> None:
3233
mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1
3334
file_name = sys.argv[1]
3435

36+
time_start = time.time()
3537
if mpi_rank_zero:
3638
apply_dict = backend_load_file(file_name=file_name)
3739
else:
@@ -46,6 +48,7 @@ def main() -> None:
4648
backend_write_file(
4749
file_name=file_name,
4850
output=result,
51+
runtime=time.time() - time_start,
4952
)
5053
MPI.COMM_WORLD.Barrier()
5154

executorlib/cache/backend.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import time
23
from typing import Any
34

45
from executorlib.cache.shared import FutureItem
@@ -28,21 +29,25 @@ def backend_load_file(file_name: str) -> dict:
2829
return apply_dict
2930

3031

31-
def backend_write_file(file_name: str, output: Any) -> None:
32+
def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
3233
"""
3334
Write the output to an HDF5 file.
3435
3536
Args:
3637
file_name (str): The name of the HDF5 file.
3738
output (Any): The output to be written.
39+
runtime (float): Time for executing function.
3840
3941
Returns:
4042
None
4143
4244
"""
4345
file_name_out = os.path.splitext(file_name)[0]
4446
os.rename(file_name, file_name_out + ".h5ready")
45-
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
47+
dump(
48+
file_name=file_name_out + ".h5ready",
49+
data_dict={"output": output, "runtime": runtime},
50+
)
4651
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
4752

4853

@@ -57,10 +62,12 @@ def backend_execute_task_in_file(file_name: str) -> None:
5762
None
5863
"""
5964
apply_dict = backend_load_file(file_name=file_name)
65+
time_start = time.time()
6066
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
6167
backend_write_file(
6268
file_name=file_name,
6369
output=result,
70+
runtime=time.time() - time_start,
6471
)
6572

6673

executorlib/interactive/shared.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import queue
44
import sys
5+
import time
56
from concurrent.futures import Future
67
from time import sleep
78
from typing import Callable, List, Optional
@@ -627,8 +628,10 @@ def _execute_task_with_cache(
627628
f = task_dict.pop("future")
628629
if f.set_running_or_notify_cancel():
629630
try:
631+
time_start = time.time()
630632
result = interface.send_and_receive_dict(input_dict=task_dict)
631633
data_dict["output"] = result
634+
data_dict["runtime"] = time.time() - time_start
632635
dump(file_name=file_name, data_dict=data_dict)
633636
f.set_result(result)
634637
except Exception as thread_exception:

executorlib/standalone/hdf.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None:
1818
"args": "input_args",
1919
"kwargs": "input_kwargs",
2020
"output": "output",
21+
"runtime": "runtime",
2122
"queue_id": "queue_id",
2223
}
2324
with h5py.File(file_name, "a") as fname:
@@ -73,6 +74,23 @@ def get_output(file_name: str) -> Tuple[bool, object]:
7374
return False, None
7475

7576

77+
def get_runtime(file_name: str) -> float:
78+
"""
79+
Get run time from HDF5 file
80+
81+
Args:
82+
file_name (str): file name of the HDF5 file as absolute path
83+
84+
Returns:
85+
float: run time from the execution of the python function
86+
"""
87+
with h5py.File(file_name, "r") as hdf:
88+
if "runtime" in hdf:
89+
return cloudpickle.loads(np.void(hdf["/runtime"]))
90+
else:
91+
return 0.0
92+
93+
7694
def get_queue_id(file_name: str) -> Optional[int]:
7795
with h5py.File(file_name, "r") as hdf:
7896
if "queue_id" in hdf:

tests/test_cache_hdf.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55

66
try:
7-
from executorlib.standalone.hdf import dump, load, get_output, get_queue_id
7+
from executorlib.standalone.hdf import (
8+
dump,
9+
load,
10+
get_output,
11+
get_runtime,
12+
get_queue_id,
13+
)
814

915
skip_h5py_test = False
1016
except ImportError:
@@ -34,6 +40,7 @@ def test_hdf_mixed(self):
3440
self.assertEqual(data_dict["args"], [a])
3541
self.assertEqual(data_dict["kwargs"], {"b": b})
3642
flag, output = get_output(file_name=file_name)
43+
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
3744
self.assertFalse(flag)
3845
self.assertIsNone(output)
3946

@@ -49,6 +56,7 @@ def test_hdf_args(self):
4956
self.assertEqual(data_dict["args"], [a, b])
5057
self.assertEqual(data_dict["kwargs"], {})
5158
flag, output = get_output(file_name=file_name)
59+
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
5260
self.assertFalse(flag)
5361
self.assertIsNone(output)
5462

@@ -73,6 +81,7 @@ def test_hdf_kwargs(self):
7381
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
7482
self.assertEqual(get_queue_id(file_name=file_name), 123)
7583
flag, output = get_output(file_name=file_name)
84+
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
7685
self.assertFalse(flag)
7786
self.assertIsNone(output)
7887

@@ -87,6 +96,7 @@ def test_hdf_queue_id(self):
8796
)
8897
self.assertEqual(get_queue_id(file_name=file_name), 123)
8998
flag, output = get_output(file_name=file_name)
99+
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
90100
self.assertFalse(flag)
91101
self.assertIsNone(output)
92102

tests/test_cache_shared.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
try:
88
from executorlib.cache.backend import backend_execute_task_in_file
99
from executorlib.cache.shared import _check_task_output, FutureItem
10-
from executorlib.standalone.hdf import dump
10+
from executorlib.standalone.hdf import dump, get_runtime
1111
from executorlib.standalone.serialize import serialize_funct_h5
1212

1313
skip_h5io_test = False
@@ -40,6 +40,10 @@ def test_execute_function_mixed(self):
4040
)
4141
self.assertTrue(future_obj.done())
4242
self.assertEqual(future_obj.result(), 3)
43+
self.assertTrue(
44+
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
45+
> 0.0
46+
)
4347
future_file_obj = FutureItem(
4448
file_name=os.path.join(cache_directory, task_key + ".h5out")
4549
)
@@ -63,6 +67,10 @@ def test_execute_function_args(self):
6367
)
6468
self.assertTrue(future_obj.done())
6569
self.assertEqual(future_obj.result(), 3)
70+
self.assertTrue(
71+
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
72+
> 0.0
73+
)
6674
future_file_obj = FutureItem(
6775
file_name=os.path.join(cache_directory, task_key + ".h5out")
6876
)
@@ -86,6 +94,10 @@ def test_execute_function_kwargs(self):
8694
)
8795
self.assertTrue(future_obj.done())
8896
self.assertEqual(future_obj.result(), 3)
97+
self.assertTrue(
98+
get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out"))
99+
> 0.0
100+
)
89101
future_file_obj = FutureItem(
90102
file_name=os.path.join(cache_directory, task_key + ".h5out")
91103
)

0 commit comments

Comments
 (0)