Skip to content

Commit f661177

Browse files
Cache: Use explicit arguments for serialize_funct_h5() (#448)
* Cache: Use explicit arguments for serialize_funct_h5() * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add resource dict * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix docstring * Update submit function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove ExecutorSteps --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent e67db60 commit f661177

File tree

7 files changed

+136
-73
lines changed

7 files changed

+136
-73
lines changed

executorlib/base/executor.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
)
88
from typing import Optional
99

10-
from executorlib.standalone.inputcheck import (
11-
check_resource_dict,
12-
check_resource_dict_is_empty,
13-
)
10+
from executorlib.standalone.inputcheck import check_resource_dict
1411
from executorlib.standalone.queue import cancel_items_in_queue
1512
from executorlib.standalone.serialize import cloudpickle_register
1613
from executorlib.standalone.thread import RaisingThread
@@ -89,10 +86,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
8986
Returns:
9087
Future: A Future representing the given call.
9188
"""
92-
check_resource_dict_is_empty(resource_dict=resource_dict)
9389
check_resource_dict(function=fn)
9490
f = Future()
95-
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
91+
self._future_queue.put(
92+
{
93+
"fn": fn,
94+
"args": args,
95+
"kwargs": kwargs,
96+
"future": f,
97+
"resource_dict": resource_dict,
98+
}
99+
)
96100
return f
97101

98102
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):

executorlib/cache/shared.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ def execute_tasks_h5(
9494
file_name_dict=file_name_dict,
9595
)
9696
task_key, data_dict = serialize_funct_h5(
97-
task_dict["fn"], *task_args, **task_kwargs
97+
fn=task_dict["fn"],
98+
fn_args=task_args,
99+
fn_kwargs=task_kwargs,
100+
resource_dict=task_dict["resource_dict"],
98101
)
99102
if task_key not in memory_dict.keys():
100103
if task_key + ".h5out" not in os.listdir(cache_directory):

executorlib/interactive/executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from concurrent.futures import Future
22
from typing import Any, Callable, Dict, Optional
33

4+
from executorlib.base.executor import ExecutorBase
45
from executorlib.interactive.shared import (
5-
ExecutorSteps,
66
InteractiveExecutor,
77
InteractiveStepExecutor,
88
execute_tasks_with_dependencies,
@@ -35,10 +35,10 @@
3535
pass
3636

3737

38-
class ExecutorWithDependencies(ExecutorSteps):
38+
class ExecutorWithDependencies(ExecutorBase):
3939
"""
40-
ExecutorWithDependencies is a class that extends ExecutorSteps and provides
41-
functionality for executing tasks with dependencies.
40+
ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
41+
dependencies.
4242
4343
Args:
4444
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.

executorlib/interactive/shared.py

Lines changed: 40 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1010
from executorlib.standalone.command import get_command_path
11-
from executorlib.standalone.inputcheck import check_resource_dict
11+
from executorlib.standalone.inputcheck import (
12+
check_resource_dict,
13+
check_resource_dict_is_empty,
14+
)
1215
from executorlib.standalone.interactive.communication import (
1316
SocketInterface,
1417
interface_bootup,
@@ -19,6 +22,37 @@
1922

2023

2124
class ExecutorBroker(ExecutorBase):
25+
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
26+
"""
27+
Submits a callable to be executed with the given arguments.
28+
29+
Schedules the callable to be executed as fn(*args, **kwargs) and returns
30+
a Future instance representing the execution of the callable.
31+
32+
Args:
33+
fn (callable): function to submit for execution
34+
args: arguments for the submitted function
35+
kwargs: keyword arguments for the submitted function
36+
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
37+
function. Example resource dictionary: {
38+
cores: 1,
39+
threads_per_core: 1,
40+
gpus_per_worker: 0,
41+
oversubscribe: False,
42+
cwd: None,
43+
executor: None,
44+
hostname_localhost: False,
45+
}
46+
47+
Returns:
48+
Future: A Future representing the given call.
49+
"""
50+
check_resource_dict_is_empty(resource_dict=resource_dict)
51+
check_resource_dict(function=fn)
52+
f = Future()
53+
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
54+
return f
55+
2256
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
2357
"""Clean-up the resources associated with the Executor.
2458
@@ -57,46 +91,6 @@ def _set_process(self, process: List[RaisingThread]):
5791
process.start()
5892

5993

60-
class ExecutorSteps(ExecutorBase):
61-
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
62-
"""
63-
Submits a callable to be executed with the given arguments.
64-
65-
Schedules the callable to be executed as fn(*args, **kwargs) and returns
66-
a Future instance representing the execution of the callable.
67-
68-
Args:
69-
fn (callable): function to submit for execution
70-
args: arguments for the submitted function
71-
kwargs: keyword arguments for the submitted function
72-
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
73-
function. Example resource dictionary: {
74-
cores: 1,
75-
threads_per_core: 1,
76-
gpus_per_worker: 0,
77-
oversubscribe: False,
78-
cwd: None,
79-
executor: None,
80-
hostname_localhost: False,
81-
}
82-
83-
Returns:
84-
A Future representing the given call.
85-
"""
86-
check_resource_dict(function=fn)
87-
f = Future()
88-
self._future_queue.put(
89-
{
90-
"fn": fn,
91-
"args": args,
92-
"kwargs": kwargs,
93-
"future": f,
94-
"resource_dict": resource_dict,
95-
}
96-
)
97-
return f
98-
99-
10094
class InteractiveExecutor(ExecutorBroker):
10195
"""
10296
The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
@@ -151,7 +145,7 @@ def __init__(
151145
)
152146

153147

154-
class InteractiveStepExecutor(ExecutorSteps):
148+
class InteractiveStepExecutor(ExecutorBase):
155149
"""
156150
The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
157151
tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
@@ -596,7 +590,10 @@ def _execute_task_with_cache(
596590

597591
future = task_dict["future"]
598592
task_key, data_dict = serialize_funct_h5(
599-
task_dict["fn"], *task_dict["args"], **task_dict["kwargs"]
593+
fn=task_dict["fn"],
594+
fn_args=task_dict["args"],
595+
fn_kwargs=task_dict["kwargs"],
596+
resource_dict=task_dict["resource_dict"],
600597
)
601598
os.makedirs(cache_directory, exist_ok=True)
602599
file_name = os.path.join(cache_directory, task_key + ".h5out")

executorlib/standalone/serialize.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,41 @@ def cloudpickle_register(ind: int = 2):
2828
pass
2929

3030

31-
def serialize_funct_h5(fn: callable, *args: Any, **kwargs: Any) -> Tuple[str, dict]:
31+
def serialize_funct_h5(
32+
fn: callable, fn_args: list = [], fn_kwargs: dict = {}, resource_dict: dict = {}
33+
) -> Tuple[str, dict]:
3234
"""
3335
Serialize a function and its arguments and keyword arguments into an HDF5 file.
3436
3537
Args:
3638
fn (callable): The function to be serialized.
37-
*args (Any): The arguments of the function.
38-
**kwargs (Any): The keyword arguments of the function.
39+
fn_args (list): The arguments of the function.
40+
fn_kwargs (dict): The keyword arguments of the function.
41+
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
42+
Example resource dictionary: {
43+
cores: 1,
44+
threads_per_core: 1,
45+
gpus_per_worker: 0,
46+
oversubscribe: False,
47+
cwd: None,
48+
executor: None,
49+
hostname_localhost: False,
50+
}
3951
4052
Returns:
4153
Tuple[str, dict]: A tuple containing the task key and the serialized data.
4254
4355
"""
44-
binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
56+
binary_all = cloudpickle.dumps(
57+
{"fn": fn, "args": fn_args, "kwargs": fn_kwargs, "resource_dict": resource_dict}
58+
)
4559
task_key = fn.__name__ + _get_hash(binary=binary_all)
46-
data = {"fn": fn, "args": args, "kwargs": kwargs}
60+
data = {
61+
"fn": fn,
62+
"args": fn_args,
63+
"kwargs": fn_kwargs,
64+
"resource_dict": resource_dict,
65+
}
4766
return task_key, data
4867

4968

tests/test_cache_executor_serial.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,15 @@ def test_executor_working_directory(self):
5555
def test_executor_function(self):
5656
fs1 = Future()
5757
q = Queue()
58-
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
58+
q.put(
59+
{
60+
"fn": my_funct,
61+
"args": (),
62+
"kwargs": {"a": 1, "b": 2},
63+
"future": fs1,
64+
"resource_dict": {},
65+
}
66+
)
5967
cache_dir = os.path.abspath("cache")
6068
os.makedirs(cache_dir, exist_ok=True)
6169
process = RaisingThread(
@@ -80,8 +88,24 @@ def test_executor_function_dependence_kwargs(self):
8088
fs1 = Future()
8189
fs2 = Future()
8290
q = Queue()
83-
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
84-
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": fs1}, "future": fs2})
91+
q.put(
92+
{
93+
"fn": my_funct,
94+
"args": (),
95+
"kwargs": {"a": 1, "b": 2},
96+
"future": fs1,
97+
"resource_dict": {},
98+
}
99+
)
100+
q.put(
101+
{
102+
"fn": my_funct,
103+
"args": (),
104+
"kwargs": {"a": 1, "b": fs1},
105+
"future": fs2,
106+
"resource_dict": {},
107+
}
108+
)
85109
cache_dir = os.path.abspath("cache")
86110
os.makedirs(cache_dir, exist_ok=True)
87111
process = RaisingThread(
@@ -106,8 +130,24 @@ def test_executor_function_dependence_args(self):
106130
fs1 = Future()
107131
fs2 = Future()
108132
q = Queue()
109-
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
110-
q.put({"fn": my_funct, "args": [fs1], "kwargs": {"b": 2}, "future": fs2})
133+
q.put(
134+
{
135+
"fn": my_funct,
136+
"args": (),
137+
"kwargs": {"a": 1, "b": 2},
138+
"future": fs1,
139+
"resource_dict": {},
140+
}
141+
)
142+
q.put(
143+
{
144+
"fn": my_funct,
145+
"args": [fs1],
146+
"kwargs": {"b": 2},
147+
"future": fs2,
148+
"resource_dict": {},
149+
}
150+
)
111151
cache_dir = os.path.abspath("cache")
112152
os.makedirs(cache_dir, exist_ok=True)
113153
process = RaisingThread(

tests/test_cache_shared.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def test_execute_function_mixed(self):
2727
cache_directory = os.path.abspath("cache")
2828
os.makedirs(cache_directory, exist_ok=True)
2929
task_key, data_dict = serialize_funct_h5(
30-
my_funct,
31-
1,
32-
b=2,
30+
fn=my_funct,
31+
fn_args=[1],
32+
fn_kwargs={"b": 2},
3333
)
3434
file_name = os.path.join(cache_directory, task_key + ".h5in")
3535
dump(file_name=file_name, data_dict=data_dict)
@@ -50,9 +50,9 @@ def test_execute_function_args(self):
5050
cache_directory = os.path.abspath("cache")
5151
os.makedirs(cache_directory, exist_ok=True)
5252
task_key, data_dict = serialize_funct_h5(
53-
my_funct,
54-
1,
55-
2,
53+
fn=my_funct,
54+
fn_args=[1, 2],
55+
fn_kwargs={},
5656
)
5757
file_name = os.path.join(cache_directory, task_key + ".h5in")
5858
dump(file_name=file_name, data_dict=data_dict)
@@ -73,9 +73,9 @@ def test_execute_function_kwargs(self):
7373
cache_directory = os.path.abspath("cache")
7474
os.makedirs(cache_directory, exist_ok=True)
7575
task_key, data_dict = serialize_funct_h5(
76-
my_funct,
77-
a=1,
78-
b=2,
76+
fn=my_funct,
77+
fn_args=[],
78+
fn_kwargs={"a": 1, "b": 2},
7979
)
8080
file_name = os.path.join(cache_directory, task_key + ".h5in")
8181
dump(file_name=file_name, data_dict=data_dict)

0 commit comments

Comments
 (0)